diff --git a/CHANGELOG.md b/CHANGELOG.md index e0abf753..a0417c4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD +- Perform upgrade tasks to check if PG service can be upgraded before updating service - Expose project CA certificate to service secrets: `REDIS_CA_CERT`, `MYSQL_CA_CERT`, etc. - Add `KafkaTopic` field `config.local_retention_bytes`, type `integer`: local.retention.bytes value - Add `KafkaTopic` field `config.local_retention_ms`, type `integer`: local.retention.ms value diff --git a/controllers/cassandra_controller.go b/controllers/cassandra_controller.go index 7095af89..d66d60ee 100644 --- a/controllers/cassandra_controller.go +++ b/controllers/cassandra_controller.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -63,7 +64,7 @@ func (a *cassandraAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *cassandraAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *cassandraAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -86,3 +87,7 @@ func (a *cassandraAdapter) getServiceType() string { func (a *cassandraAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *cassandraAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/controllers/clickhouse_controller.go b/controllers/clickhouse_controller.go index 18633ad5..bed394b5 100644 --- a/controllers/clickhouse_controller.go +++ b/controllers/clickhouse_controller.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -62,7 +63,7 @@ func (a *clickhouseAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *clickhouseAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *clickhouseAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -89,3 +90,7 @@ func (a *clickhouseAdapter) getServiceType() string { func (a *clickhouseAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *clickhouseAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/controllers/generic_service_handler.go b/controllers/generic_service_handler.go index ec54f213..0334e9c8 100644 --- a/controllers/generic_service_handler.go +++ b/controllers/generic_service_handler.go @@ -43,7 +43,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C } } - _, err = avn.Services.Get(ctx, spec.Project, ometa.Name) + oldService, err := avn.Services.Get(ctx, spec.Project, ometa.Name) exists := err == nil if !exists && !aiven.IsNotFound(err) { return fmt.Errorf("failed to fetch service: %w", err) @@ -96,6 +96,12 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C return err } + // Perform upgrade task if necessary (at the moment, this is relevant only for PostgreSQL) + err = o.performUpgradeTaskIfNeeded(ctx, avn, avnGen, oldService) + if err != nil { + return err + } + req := aiven.UpdateServiceRequest{ Cloud: spec.CloudName, DiskSpaceMB: v1alpha1.ConvertDiscSpace(o.getDiskSpace()), @@ -238,4 +244,5 @@ type serviceAdapter interface { getDiskSpace() string getUserConfig() any newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) + performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error } diff --git a/controllers/grafana_controller.go b/controllers/grafana_controller.go index 239977ed..a448d611 100644 --- a/controllers/grafana_controller.go +++ b/controllers/grafana_controller.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -63,7 +64,7 @@ func (a *grafanaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *grafanaAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *grafanaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -86,3 +87,7 @@ func (a *grafanaAdapter) getServiceType() string { func (a *grafanaAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *grafanaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/controllers/kafka_controller.go b/controllers/kafka_controller.go index f44e2959..055aa67c 100644 --- a/controllers/kafka_controller.go +++ b/controllers/kafka_controller.go @@ -8,6 +8,7 @@ import ( "strconv" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -63,7 +64,7 @@ func (a *kafkaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *kafkaAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *kafkaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -121,3 +122,7 @@ func (a *kafkaAdapter) getServiceType() string { func (a *kafkaAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *kafkaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/controllers/kafkaconnect_controller.go b/controllers/kafkaconnect_controller.go index cdf4bb79..cf14bc58 100644 --- a/controllers/kafkaconnect_controller.go +++ b/controllers/kafkaconnect_controller.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -60,7 +61,7 @@ func (a *kafkaConnectAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec } func (a *kafkaConnectAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *kafkaConnectAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -78,3 +79,7 @@ func (a *kafkaConnectAdapter) getDiskSpace() string { func (a *kafkaConnectAdapter) GetConnInfoSecretTarget() v1alpha1.ConnInfoSecretTarget { return v1alpha1.ConnInfoSecretTarget{} } + +func (a *kafkaConnectAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/controllers/mysql_controller.go b/controllers/mysql_controller.go index ea3eb585..c4d98e83 100644 --- a/controllers/mysql_controller.go +++ b/controllers/mysql_controller.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -62,7 +63,7 @@ func (a *mySQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *mySQLAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *mySQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -87,3 +88,7 @@ func (a *mySQLAdapter) getServiceType() string { func (a *mySQLAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *mySQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/controllers/opensearch_controller.go b/controllers/opensearch_controller.go index fd83a5aa..5884b38d 100644 --- a/controllers/opensearch_controller.go +++ b/controllers/opensearch_controller.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -64,7 +65,7 @@ func (a *opensearchAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *opensearchAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *opensearchAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -91,3 +92,7 @@ func (a *opensearchAdapter) getServiceType() string { func (a *opensearchAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *opensearchAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/controllers/postgresql_controller.go b/controllers/postgresql_controller.go index d3ce2cde..874470e2 100644 --- a/controllers/postgresql_controller.go +++ b/controllers/postgresql_controller.go @@ -5,14 +5,18 @@ package controllers import ( "context" "fmt" + "time" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aiven/aiven-operator/api/v1alpha1" + pguserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/pg" ) // PostgreSQLReconciler reconciles a PostgreSQL object @@ -20,6 +24,8 @@ type PostgreSQLReconciler struct { Controller } +const waitForTaskToCompleteInterval = time.Second * 10 + //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/status,verbs=get;update;patch //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/finalizers,verbs=get;list;watch;create;update;patch;delete @@ -61,7 +67,7 @@ func (a *postgreSQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *postgreSQLAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *postgreSQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -94,3 +100,59 @@ func (a *postgreSQLAdapter) getServiceType() string { func (a *postgreSQLAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *postgreSQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + var currentVersion string = old.UserConfig["pg_version"].(string) + targetUserConfig := a.getUserConfig().(*pguserconfig.PgUserConfig) + if targetUserConfig == nil || targetUserConfig.PgVersion == nil { + return nil + } + var targetVersion string = *targetUserConfig.PgVersion + + // No need to upgrade if pg_version hasn't changed + if targetVersion == currentVersion { + return nil + } + + task, err := avnGen.ServiceTaskCreate(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, &service.ServiceTaskCreateIn{ + TargetVersion: service.TargetVersionType(targetVersion), + TaskType: service.TaskTypeUpgradeCheck, + }) + if err != nil { + return fmt.Errorf("cannot create PG upgrade check task: %q", err) + } + + finalTaskResult, err := waitForTaskToComplete(ctx, func() (bool, *aiven.ServiceTask, error) { + t, getErr := avn.ServiceTask.Get(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, task.TaskId) + if getErr != nil { + return true, nil, fmt.Errorf("error fetching service task %s: %q", t.Task.Id, getErr) + } + + if t.Task.Success == nil { + return false, nil, nil + } + return true, &t.Task, nil + }) + if err != nil { + return err + } + if !*finalTaskResult.Success { + return fmt.Errorf("PG service upgrade check error, version upgrade from %s to %s, result: %s", currentVersion, targetVersion, finalTaskResult.Result) + } + return nil +} + +func waitForTaskToComplete[T any](ctx context.Context, f func() (bool, *T, error)) (*T, error) { + var err error + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context timeout while retrying operation, error=%q", err) + case <-time.After(waitForTaskToCompleteInterval): + finished, val, err := f() + if finished { + return val, err + } + } + } +} diff --git a/controllers/redis_controller.go b/controllers/redis_controller.go index 58b179b1..9def5c1b 100644 --- a/controllers/redis_controller.go +++ b/controllers/redis_controller.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -64,7 +65,7 @@ func (a *redisAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *redisAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *redisAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -93,3 +94,7 @@ func (a *redisAdapter) getServiceType() string { func (a *redisAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *redisAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error { + return nil +} diff --git a/tests/postgresql_test.go b/tests/postgresql_test.go index f0dc986c..e56a893e 100644 --- a/tests/postgresql_test.go +++ b/tests/postgresql_test.go @@ -5,8 +5,11 @@ import ( "fmt" "testing" + "github.com/aiven/aiven-go-client/v2" + "github.com/aiven/go-client-codegen/handler/service" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" "github.com/aiven/aiven-operator/api/v1alpha1" ) @@ -247,3 +250,68 @@ func TestPgCustomPrefix(t *testing.T) { assert.NotEmpty(t, secret.Data["MY_PG_DATABASE_URI"]) assert.NotEmpty(t, secret.Data["MY_PG_CA_CERT"]) } + +func getPgUpgradeVersionYaml(project, pgName, cloudName, version string) string { + return fmt.Sprintf(` + apiVersion: aiven.io/v1alpha1 + kind: PostgreSQL + metadata: + name: %[2]s + spec: + authSecretRef: + name: aiven-token + key: token + project: %[1]s + cloudName: %[3]s + plan: startup-4 + userConfig: + pg_version: "%[4]s" +`, project, pgName, cloudName, version) +} + +func TestPgUpgradeVersion(t *testing.T) { + t.Parallel() + defer recoverPanic(t) + + pgVersions := service.TargetVersionTypeChoices() + startingVersion := pgVersions[len(pgVersions)-2] + targetVersion := pgVersions[len(pgVersions)-1] + + ctx := context.Background() + pgName := randName("upgrade-test") + yaml := getPgUpgradeVersionYaml(cfg.Project, pgName, cfg.PrimaryCloudName, startingVersion) + s := NewSession(k8sClient, avnClient, cfg.Project) + + defer s.Destroy() + + require.NoError(t, s.Apply(yaml)) + + pg := new(v1alpha1.PostgreSQL) + require.NoError(t, s.GetRunning(pg, pgName)) + + pgAvn, err := avnClient.Services.Get(ctx, cfg.Project, pgName) + require.NoError(t, err) + assert.Equal(t, startingVersion, pgAvn.UserConfig["pg_version"]) + assert.Equal(t, anyPointer(startingVersion), pg.Spec.UserConfig.PgVersion) + + require.NotNil(t, pg.Spec.UserConfig) + assert.NotNil(t, pgAvn.UserConfig) + + updatedYaml := getPgUpgradeVersionYaml(cfg.Project, pgName, cfg.PrimaryCloudName, targetVersion) + require.NoError(t, s.Apply(updatedYaml)) + + // Verify that the service was upgraded successfully + var pgAvnUpd *aiven.Service + require.NoError(t, retryForever(ctx, "check that PG version was upgraded", func() (bool, error) { + pgAvnUpd, err = avnClient.Services.Get(ctx, cfg.Project, pgName) + if err != nil { + return false, err + } + + return pgAvnUpd.UserConfig["pg_version"] != startingVersion, nil + })) + + pgUpd := new(v1alpha1.PostgreSQL) + require.NoError(t, k8sClient.Get(ctx, types.NamespacedName{Name: pgName, Namespace: "default"}, pgUpd)) + assert.Equal(t, targetVersion, *pgUpd.Spec.UserConfig.PgVersion) +}