diff --git a/controllers/cassandra_controller.go b/controllers/cassandra_controller.go index 7095af895..693f7c132 100644 --- a/controllers/cassandra_controller.go +++ b/controllers/cassandra_controller.go @@ -8,6 +8,8 @@ import ( "strings" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -63,7 +65,7 @@ func (a *cassandraAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *cassandraAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *cassandraAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -86,3 +88,7 @@ func (a *cassandraAdapter) getServiceType() string { func (a *cassandraAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *cassandraAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/clickhouse_controller.go b/controllers/clickhouse_controller.go index 18633ad54..8d93b8217 100644 --- a/controllers/clickhouse_controller.go +++ b/controllers/clickhouse_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -62,7 +64,7 @@ func (a *clickhouseAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *clickhouseAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *clickhouseAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -89,3 +91,7 @@ func (a *clickhouseAdapter) getServiceType() string { func (a *clickhouseAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *clickhouseAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/generic_service_handler.go b/controllers/generic_service_handler.go index ec54f2137..c45fd0f67 100644 --- a/controllers/generic_service_handler.go +++ b/controllers/generic_service_handler.go @@ -7,6 +7,7 @@ import ( "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,7 +44,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C } } - _, err = avn.Services.Get(ctx, spec.Project, ometa.Name) + oldService, err := avnGen.ServiceGet(ctx, spec.Project, ometa.Name) exists := err == nil if !exists && !aiven.IsNotFound(err) { return fmt.Errorf("failed to fetch service: %w", err) @@ -96,6 +97,12 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C return err } + // Perform upgrade task if necessary (at the moment, this is relevant only for PostgreSQL) + err = o.performUpgradeTaskIfNeeded(ctx, avn, avnGen, oldService) + if err != nil { + return err + } + req := aiven.UpdateServiceRequest{ Cloud: spec.CloudName, DiskSpaceMB: v1alpha1.ConvertDiscSpace(o.getDiskSpace()), @@ -238,4 +245,5 @@ type serviceAdapter interface { getDiskSpace() string getUserConfig() any newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) + performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error } diff --git a/controllers/grafana_controller.go b/controllers/grafana_controller.go index 239977edf..e6344eab4 100644 --- a/controllers/grafana_controller.go +++ b/controllers/grafana_controller.go @@ -8,6 +8,8 @@ import ( "strings" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -63,7 +65,7 @@ func (a *grafanaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *grafanaAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *grafanaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -86,3 +88,7 @@ func (a *grafanaAdapter) getServiceType() string { func (a *grafanaAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *grafanaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/kafka_controller.go b/controllers/kafka_controller.go index f44e2959c..b090c7b07 100644 --- a/controllers/kafka_controller.go +++ b/controllers/kafka_controller.go @@ -8,6 +8,8 @@ import ( "strconv" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -63,7 +65,7 @@ func (a *kafkaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *kafkaAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *kafkaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -121,3 +123,7 @@ func (a *kafkaAdapter) getServiceType() string { func (a *kafkaAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *kafkaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/kafkaconnect_controller.go b/controllers/kafkaconnect_controller.go index cdf4bb799..d0483aee2 100644 --- a/controllers/kafkaconnect_controller.go +++ b/controllers/kafkaconnect_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -60,7 +62,7 @@ func (a *kafkaConnectAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec } func (a *kafkaConnectAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *kafkaConnectAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -78,3 +80,7 @@ func (a *kafkaConnectAdapter) getDiskSpace() string { func (a *kafkaConnectAdapter) GetConnInfoSecretTarget() v1alpha1.ConnInfoSecretTarget { return v1alpha1.ConnInfoSecretTarget{} } + +func (a *kafkaConnectAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/mysql_controller.go b/controllers/mysql_controller.go index ea3eb5857..e857e81a3 100644 --- a/controllers/mysql_controller.go +++ b/controllers/mysql_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -62,7 +64,7 @@ func (a *mySQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *mySQLAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *mySQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -87,3 +89,7 @@ func (a *mySQLAdapter) getServiceType() string { func (a *mySQLAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *mySQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/opensearch_controller.go b/controllers/opensearch_controller.go index fd83a5aa1..1fe2f4ea8 100644 --- a/controllers/opensearch_controller.go +++ b/controllers/opensearch_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -64,7 +66,7 @@ func (a *opensearchAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *opensearchAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *opensearchAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -91,3 +93,7 @@ func (a *opensearchAdapter) getServiceType() string { func (a *opensearchAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *opensearchAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/postgresql_controller.go b/controllers/postgresql_controller.go index d3ce2cdec..58ff8b2f9 100644 --- a/controllers/postgresql_controller.go +++ b/controllers/postgresql_controller.go @@ -5,14 +5,18 @@ package controllers import ( "context" "fmt" + "time" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aiven/aiven-operator/api/v1alpha1" + pguserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/pg" ) // PostgreSQLReconciler reconciles a PostgreSQL object @@ -20,6 +24,8 @@ type PostgreSQLReconciler struct { Controller } +const waitForTaskToCompleteInterval = time.Second * 10 + //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/status,verbs=get;update;patch //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/finalizers,verbs=get;list;watch;create;update;patch;delete @@ -61,7 +67,7 @@ func (a *postgreSQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *postgreSQLAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *postgreSQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -94,3 +100,59 @@ func (a *postgreSQLAdapter) getServiceType() string { func (a *postgreSQLAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *postgreSQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + var currentVersion string = old.UserConfig["pg_version"].(string) + var targetVersion string = *a.getUserConfig().(pguserconfig.PgUserConfig).PgVersion + + // No need to upgrade if pg_version hasn't changed + if targetVersion == currentVersion { + return nil + } + + task, err := avnGen.ServiceTaskCreate(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, &service.ServiceTaskCreateIn{ + TargetVersion: service.TargetVersionType(targetVersion), + TaskType: service.TaskTypeUpgradeCheck, + }) + if err != nil { + return fmt.Errorf("cannot create PG upgrade check task: %q", err) + } + + didTaskSucceed, err := waitForTaskToComplete(ctx, func() (*bool, error) { + t, getErr := avn.ServiceTask.Get(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, task.TaskId) + if getErr != nil { + return nil, fmt.Errorf("error fetching service task %s: %q", t.Task.Id, getErr) + } + + if t.Task.Success == nil { + return nil, nil + } + return t.Task.Success, nil + }) + if err != nil { + return err + } + + if !*didTaskSucceed { + return fmt.Errorf("PG service upgrade check error, version upgrade from %s to %s, result: %s", currentVersion, targetVersion, task.Result) + } + return nil +} + +func waitForTaskToComplete[T any](ctx context.Context, f func() (*T, error)) (*T, error) { + var err error + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context timeout while retrying operation, error=%q", err) + case <-time.After(waitForTaskToCompleteInterval): + val, err := f() + if err != nil { + return nil, err + } + if val != nil { + return val, nil + } + } + } +} diff --git a/controllers/redis_controller.go b/controllers/redis_controller.go index 58b179b1e..397093927 100644 --- a/controllers/redis_controller.go +++ b/controllers/redis_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -64,7 +66,7 @@ func (a *redisAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *redisAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *redisAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -93,3 +95,7 @@ func (a *redisAdapter) getServiceType() string { func (a *redisAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *redisAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/tests/postgresql_test.go b/tests/postgresql_test.go index 6c0b1982a..6eace053d 100644 --- a/tests/postgresql_test.go +++ b/tests/postgresql_test.go @@ -5,8 +5,11 @@ import ( "fmt" "testing" + "github.com/aiven/aiven-go-client/v2" + "github.com/aiven/go-client-codegen/handler/service" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" "github.com/aiven/aiven-operator/api/v1alpha1" ) @@ -247,3 +250,80 @@ func TestPgCustomPrefix(t *testing.T) { assert.NotEmpty(t, secret.Data["MY_PG_DATABASE_URI"]) assert.NotEmpty(t, secret.Data["MY_PG_CA_CERT"]) } + +func getPgUpgradeVersionYaml(project, pgName, cloudName, version string) string { + return fmt.Sprintf(` + apiVersion: aiven.io/v1alpha1 + kind: PostgreSQL + metadata: + name: %[2]s + spec: + authSecretRef: + name: aiven-token + key: token + project: %[1]s + cloudName: %[3]s + plan: startup-4 + userConfig: + pg_version: "%[4]s" +`, project, pgName, cloudName, version) +} + +func TestPgUpgradeVersion(t *testing.T) { + t.Parallel() + defer recoverPanic(t) + + pgVersions := service.TargetVersionTypeChoices() + startingVersion := pgVersions[len(pgVersions)-2] + targetVersion := pgVersions[len(pgVersions)-1] + + // GIVEN + ctx := context.Background() + pgName := randName("secret-prefix") + yaml := getPgUpgradeVersionYaml(testProject, pgName, testPrimaryCloudName, startingVersion) + s := NewSession(k8sClient, avnClient, testProject) + + // Cleans test afterwards + defer s.Destroy() + + // WHEN + // Applies given manifest + require.NoError(t, s.Apply(yaml)) + + // Waits kube objects + pg := new(v1alpha1.PostgreSQL) + require.NoError(t, s.GetRunning(pg, pgName)) + + // THEN + // Validates instance + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) + require.NoError(t, err) + // Tests non-strict yaml. By sending string-integer we expect it's parsed as a string. + // Default version is 15, we get 14, as we set it. + assert.Equal(t, startingVersion, pgAvn.UserConfig["pg_version"]) + assert.Equal(t, anyPointer(startingVersion), pg.Spec.UserConfig.PgVersion) + + // UserConfig test + require.NotNil(t, pg.Spec.UserConfig) + assert.NotNil(t, pgAvn.UserConfig) // "Aiven instance has defaults set" + + updatedYaml := getPgUpgradeVersionYaml(testProject, pgName, testPrimaryCloudName, targetVersion) + require.NoError(t, s.Apply(updatedYaml)) + + // Verify that the service was upgraded successfully + var pgAvnUpd *aiven.Service + require.NoError(t, retryForever(ctx, "check that PG version was upgraded", func() (bool, error) { + pgAvnUpd, err = avnClient.Services.Get(ctx, testProject, pgName) + if err != nil { + return false, err + } + + // Just waits object being updated in Aiven + return pgAvnUpd.UserConfig["pg_version"] != startingVersion, nil + })) + + // Gets kube object + pgUpd := new(v1alpha1.PostgreSQL) + require.NoError(t, k8sClient.Get(ctx, types.NamespacedName{Name: pgName, Namespace: "default"}, pgUpd)) + assert.Equal(t, &targetVersion, *pgUpd.Spec.UserConfig.PgVersion) +}