From 969cb5af378459e655f9dd9a798bc6893e618755 Mon Sep 17 00:00:00 2001 From: Jeffrey Held Date: Fri, 16 Feb 2024 14:41:21 +0100 Subject: [PATCH] feat: check PG upgrade task before attempting to upgrade service --- controllers/cassandra_controller.go | 10 ++++ controllers/clickhouse_controller.go | 10 ++++ controllers/generic_service_handler.go | 13 ++++- controllers/grafana_controller.go | 10 ++++ controllers/kafka_controller.go | 10 ++++ controllers/kafkaconnect_controller.go | 10 ++++ controllers/mysql_controller.go | 10 ++++ controllers/opensearch_controller.go | 10 ++++ controllers/postgresql_controller.go | 60 ++++++++++++++++++++- controllers/redis_controller.go | 10 ++++ tests/postgresql_test.go | 75 ++++++++++++++++++++++++++ 11 files changed, 226 insertions(+), 2 deletions(-) diff --git a/controllers/cassandra_controller.go b/controllers/cassandra_controller.go index 7095af895..c49d7239a 100644 --- a/controllers/cassandra_controller.go +++ b/controllers/cassandra_controller.go @@ -8,6 +8,8 @@ import ( "strings" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -86,3 +88,11 @@ func (a *cassandraAdapter) getServiceType() string { func (a *cassandraAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *cassandraAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *cassandraAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/clickhouse_controller.go b/controllers/clickhouse_controller.go index 18633ad54..1fa553940 100644 --- a/controllers/clickhouse_controller.go +++ b/controllers/clickhouse_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -89,3 +91,11 @@ func (a *clickhouseAdapter) getServiceType() string { func (a *clickhouseAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *clickhouseAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *clickhouseAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/generic_service_handler.go b/controllers/generic_service_handler.go index ec54f2137..dda9cb691 100644 --- a/controllers/generic_service_handler.go +++ b/controllers/generic_service_handler.go @@ -7,6 +7,7 @@ import ( "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,7 +44,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C } } - _, err = avn.Services.Get(ctx, spec.Project, ometa.Name) + oldService, err := avnGen.ServiceGet(ctx, spec.Project, ometa.Name) exists := err == nil if !exists && !aiven.IsNotFound(err) { return fmt.Errorf("failed to fetch service: %w", err) @@ -96,6 +97,14 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C return err } + // Check that upgrade task is needed + if o.requiresUpgradeTask(oldService) { + err = o.performUpgradeTask(ctx, avn, avnGen, oldService) + if err != nil { + return err + } + } + req := aiven.UpdateServiceRequest{ Cloud: spec.CloudName, DiskSpaceMB: v1alpha1.ConvertDiscSpace(o.getDiskSpace()), @@ -238,4 +247,6 @@ type serviceAdapter interface { getDiskSpace() string getUserConfig() any newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) + requiresUpgradeTask(old *service.ServiceGetOut) bool + performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error } diff --git a/controllers/grafana_controller.go b/controllers/grafana_controller.go index 239977edf..95fccd7c6 100644 --- a/controllers/grafana_controller.go +++ b/controllers/grafana_controller.go @@ -8,6 +8,8 @@ import ( "strings" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -86,3 +88,11 @@ func (a *grafanaAdapter) getServiceType() string { func (a *grafanaAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *grafanaAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *grafanaAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/kafka_controller.go b/controllers/kafka_controller.go index f44e2959c..c723717ba 100644 --- a/controllers/kafka_controller.go +++ b/controllers/kafka_controller.go @@ -8,6 +8,8 @@ import ( "strconv" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -121,3 +123,11 @@ func (a *kafkaAdapter) getServiceType() string { func (a *kafkaAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *kafkaAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *kafkaAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/kafkaconnect_controller.go b/controllers/kafkaconnect_controller.go index cdf4bb799..8b189c4a7 100644 --- a/controllers/kafkaconnect_controller.go +++ b/controllers/kafkaconnect_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -78,3 +80,11 @@ func (a *kafkaConnectAdapter) getDiskSpace() string { func (a *kafkaConnectAdapter) GetConnInfoSecretTarget() v1alpha1.ConnInfoSecretTarget { return v1alpha1.ConnInfoSecretTarget{} } + +func (a *kafkaConnectAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *kafkaConnectAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/mysql_controller.go b/controllers/mysql_controller.go index ea3eb5857..0cd518914 100644 --- a/controllers/mysql_controller.go +++ b/controllers/mysql_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -87,3 +89,11 @@ func (a *mySQLAdapter) getServiceType() string { func (a *mySQLAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *mySQLAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *mySQLAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/opensearch_controller.go b/controllers/opensearch_controller.go index fd83a5aa1..6357cc3b3 100644 --- a/controllers/opensearch_controller.go +++ b/controllers/opensearch_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -91,3 +93,11 @@ func (a *opensearchAdapter) getServiceType() string { func (a *opensearchAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *opensearchAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *opensearchAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/controllers/postgresql_controller.go b/controllers/postgresql_controller.go index d3ce2cdec..d3bc7db48 100644 --- a/controllers/postgresql_controller.go +++ b/controllers/postgresql_controller.go @@ -5,14 +5,18 @@ package controllers import ( "context" "fmt" + "time" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aiven/aiven-operator/api/v1alpha1" + pguserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/pg" ) // PostgreSQLReconciler reconciles a PostgreSQL object @@ -20,6 +24,8 @@ type PostgreSQLReconciler struct { Controller } +const waitForTaskToCompleteInterval = time.Second * 10 + //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/status,verbs=get;update;patch //+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/finalizers,verbs=get;list;watch;create;update;patch;delete @@ -61,7 +67,7 @@ func (a *postgreSQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec { } func (a *postgreSQLAdapter) getUserConfig() any { - return &a.Spec.UserConfig + return a.Spec.UserConfig } func (a *postgreSQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) { @@ -94,3 +100,55 @@ func (a *postgreSQLAdapter) getServiceType() string { func (a *postgreSQLAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *postgreSQLAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + var version string = *a.getUserConfig().(pguserconfig.PgUserConfig).PgVersion + return version != old.UserConfig["pg_version"] +} + +func (a *postgreSQLAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + var targetVersion string = *a.getUserConfig().(pguserconfig.PgUserConfig).PgVersion + task, err := avnGen.ServiceTaskCreate(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, &service.ServiceTaskCreateIn{ + TargetVersion: service.TargetVersionType(targetVersion), + TaskType: service.TaskType("upgrade_check"), + }) + if err != nil { + return fmt.Errorf("cannot create PG upgrade check task: %q", err) + } + + taskGet, err := waitForTaskToComplete(ctx, func() (*aiven.ServiceTask, error) { + t, getErr := avn.ServiceTask.Get(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, task.TaskId) + if getErr != nil { + return nil, fmt.Errorf("error fetching service task %s: %q", t.Task.Id, getErr) + } + + if t.Task.Success == nil { + return nil, nil + } + return &t.Task, nil + }) + if err != nil { + return err + } + + if !*taskGet.Success { + return fmt.Errorf("PG service upgrade check error, version upgrade from %s to %s, result: %s", old.UserConfig["pg_version"], targetVersion, task.Result) + } + return nil +} + +func waitForTaskToComplete[T any](ctx context.Context, f func() (*T, error)) (*T, error) { + var err error + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context timeout while retrying operation, error=%q", err) + case <-time.After(waitForTaskToCompleteInterval): + val, err := f() + if val == nil { + continue + } + return val, err + } + } +} diff --git a/controllers/redis_controller.go b/controllers/redis_controller.go index 58b179b1e..3feb715c3 100644 --- a/controllers/redis_controller.go +++ b/controllers/redis_controller.go @@ -7,6 +7,8 @@ import ( "fmt" "github.com/aiven/aiven-go-client/v2" + avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -93,3 +95,11 @@ func (a *redisAdapter) getServiceType() string { func (a *redisAdapter) getDiskSpace() string { return a.Spec.DiskSpace } + +func (a *redisAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool { + return false +} + +func (a *redisAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error { + return nil +} diff --git a/tests/postgresql_test.go b/tests/postgresql_test.go index 6c0b1982a..1b92ae91a 100644 --- a/tests/postgresql_test.go +++ b/tests/postgresql_test.go @@ -5,8 +5,10 @@ import ( "fmt" "testing" + "github.com/aiven/aiven-go-client/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/types" "github.com/aiven/aiven-operator/api/v1alpha1" ) @@ -247,3 +249,76 @@ func TestPgCustomPrefix(t *testing.T) { assert.NotEmpty(t, secret.Data["MY_PG_DATABASE_URI"]) assert.NotEmpty(t, secret.Data["MY_PG_CA_CERT"]) } + +func getPgUpgradeVersionYaml(project, pgName, cloudName, version string) string { + return fmt.Sprintf(` + apiVersion: aiven.io/v1alpha1 + kind: PostgreSQL + metadata: + name: %[2]s + spec: + authSecretRef: + name: aiven-token + key: token + project: %[1]s + cloudName: %[3]s + plan: startup-4 + userConfig: + pg_version: "%[4]s" +`, project, pgName, cloudName, version) +} + +func TestPgUpgradeVersion(t *testing.T) { + t.Parallel() + defer recoverPanic(t) + + // GIVEN + ctx := context.Background() + pgName := randName("secret-prefix") + yaml := getPgUpgradeVersionYaml(testProject, pgName, testPrimaryCloudName, "14") + s := NewSession(k8sClient, avnClient, testProject) + + // Cleans test afterwards + defer s.Destroy() + + // WHEN + // Applies given manifest + require.NoError(t, s.Apply(yaml)) + + // Waits kube objects + pg := new(v1alpha1.PostgreSQL) + require.NoError(t, s.GetRunning(pg, pgName)) + + // THEN + // Validates instance + pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName) + require.NoError(t, err) + // Tests non-strict yaml. By sending string-integer we expect it's parsed as a string. + // Default version is 15, we get 14, as we set it. + assert.Equal(t, "14", pgAvn.UserConfig["pg_version"]) + assert.Equal(t, anyPointer("14"), pg.Spec.UserConfig.PgVersion) + + // UserConfig test + require.NotNil(t, pg.Spec.UserConfig) + assert.NotNil(t, pgAvn.UserConfig) // "Aiven instance has defaults set" + + updatedYaml := getPgUpgradeVersionYaml(testProject, pgName, testPrimaryCloudName, "16") + require.NoError(t, s.Apply(updatedYaml)) + + // Verify that the service was upgraded successfully + var pgAvnUpd *aiven.Service + require.NoError(t, retryForever(ctx, "check that PG version was upgraded", func() (bool, error) { + pgAvnUpd, err = avnClient.Services.Get(ctx, testProject, pgName) + if err != nil { + return false, err + } + + // Just waits object being updated in Aiven + return pgAvnUpd.UserConfig["pg_version"] != "14", nil + })) + + // Gets kube object + pgUpd := new(v1alpha1.PostgreSQL) + require.NoError(t, k8sClient.Get(ctx, types.NamespacedName{Name: pgName, Namespace: "default"}, pgUpd)) + assert.Equal(t, "16", *pgUpd.Spec.UserConfig.PgVersion) +}