Skip to content

Commit

Permalink
feat: check PG upgrade task before attempting to upgrade service
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-held-aiven committed Feb 23, 2024
1 parent 25a2ba7 commit c41bcb4
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 10 deletions.
8 changes: 7 additions & 1 deletion controllers/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -63,7 +65,7 @@ func (a *cassandraAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *cassandraAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *cassandraAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -86,3 +88,7 @@ func (a *cassandraAdapter) getServiceType() string {
func (a *cassandraAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *cassandraAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
8 changes: 7 additions & 1 deletion controllers/clickhouse_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -62,7 +64,7 @@ func (a *clickhouseAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *clickhouseAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *clickhouseAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -89,3 +91,7 @@ func (a *clickhouseAdapter) getServiceType() string {
func (a *clickhouseAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *clickhouseAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
10 changes: 9 additions & 1 deletion controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,7 +44,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
}
}

_, err = avn.Services.Get(ctx, spec.Project, ometa.Name)
oldService, err := avnGen.ServiceGet(ctx, spec.Project, ometa.Name)
exists := err == nil
if !exists && !aiven.IsNotFound(err) {
return fmt.Errorf("failed to fetch service: %w", err)
Expand Down Expand Up @@ -96,6 +97,12 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return err
}

// Perform upgrade task if necessary (at the moment, this is relevant only for PostgreSQL)
err = o.performUpgradeTaskIfNeeded(ctx, avn, avnGen, oldService)
if err != nil {
return err
}

req := aiven.UpdateServiceRequest{
Cloud: spec.CloudName,
DiskSpaceMB: v1alpha1.ConvertDiscSpace(o.getDiskSpace()),
Expand Down Expand Up @@ -238,4 +245,5 @@ type serviceAdapter interface {
getDiskSpace() string
getUserConfig() any
newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error)
performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error
}
8 changes: 7 additions & 1 deletion controllers/grafana_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -63,7 +65,7 @@ func (a *grafanaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *grafanaAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *grafanaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -86,3 +88,7 @@ func (a *grafanaAdapter) getServiceType() string {
func (a *grafanaAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *grafanaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
8 changes: 7 additions & 1 deletion controllers/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strconv"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -63,7 +65,7 @@ func (a *kafkaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *kafkaAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *kafkaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand Down Expand Up @@ -121,3 +123,7 @@ func (a *kafkaAdapter) getServiceType() string {
func (a *kafkaAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *kafkaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
8 changes: 7 additions & 1 deletion controllers/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -60,7 +62,7 @@ func (a *kafkaConnectAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec
}

func (a *kafkaConnectAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *kafkaConnectAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -78,3 +80,7 @@ func (a *kafkaConnectAdapter) getDiskSpace() string {
func (a *kafkaConnectAdapter) GetConnInfoSecretTarget() v1alpha1.ConnInfoSecretTarget {
return v1alpha1.ConnInfoSecretTarget{}
}

func (a *kafkaConnectAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
8 changes: 7 additions & 1 deletion controllers/mysql_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -62,7 +64,7 @@ func (a *mySQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *mySQLAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *mySQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -87,3 +89,7 @@ func (a *mySQLAdapter) getServiceType() string {
func (a *mySQLAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *mySQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
8 changes: 7 additions & 1 deletion controllers/opensearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -64,7 +66,7 @@ func (a *opensearchAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *opensearchAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *opensearchAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -91,3 +93,7 @@ func (a *opensearchAdapter) getServiceType() string {
func (a *opensearchAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *opensearchAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
64 changes: 63 additions & 1 deletion controllers/postgresql_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aiven/aiven-operator/api/v1alpha1"
pguserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/pg"
)

// PostgreSQLReconciler reconciles a PostgreSQL object
type PostgreSQLReconciler struct {
Controller
}

const waitForTaskToCompleteInterval = time.Second * 10

//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/finalizers,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -61,7 +67,7 @@ func (a *postgreSQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *postgreSQLAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *postgreSQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand Down Expand Up @@ -94,3 +100,59 @@ func (a *postgreSQLAdapter) getServiceType() string {
func (a *postgreSQLAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *postgreSQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
var currentVersion string = old.UserConfig["pg_version"].(string)
var targetVersion string = *a.getUserConfig().(pguserconfig.PgUserConfig).PgVersion

// No need to upgrade if pg_version hasn't changed
if targetVersion == currentVersion {
return nil
}

task, err := avnGen.ServiceTaskCreate(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, &service.ServiceTaskCreateIn{
TargetVersion: service.TargetVersionType(targetVersion),
TaskType: service.TaskTypeUpgradeCheck,
})
if err != nil {
return fmt.Errorf("cannot create PG upgrade check task: %q", err)
}

didTaskSucceed, err := waitForTaskToComplete(ctx, func() (*bool, error) {
t, getErr := avn.ServiceTask.Get(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, task.TaskId)
if getErr != nil {
return nil, fmt.Errorf("error fetching service task %s: %q", t.Task.Id, getErr)
}

if t.Task.Success == nil {
return nil, nil
}
return t.Task.Success, nil
})
if err != nil {
return err
}

if !*didTaskSucceed {
return fmt.Errorf("PG service upgrade check error, version upgrade from %s to %s, result: %s", currentVersion, targetVersion, task.Result)
}
return nil
}

func waitForTaskToComplete[T any](ctx context.Context, f func() (*T, error)) (*T, error) {
var err error
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context timeout while retrying operation, error=%q", err)
case <-time.After(waitForTaskToCompleteInterval):
val, err := f()
if err != nil {
return nil, err
}
if val != nil {
return val, nil
}
}
}
}
8 changes: 7 additions & 1 deletion controllers/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -64,7 +66,7 @@ func (a *redisAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *redisAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *redisAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand Down Expand Up @@ -93,3 +95,7 @@ func (a *redisAdapter) getServiceType() string {
func (a *redisAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *redisAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
Loading

0 comments on commit c41bcb4

Please sign in to comment.