Skip to content

Commit

Permalink
feat: check PG upgrade task before attempting to upgrade service
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-held-aiven committed Feb 19, 2024
1 parent 07b3ccd commit 969cb5a
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 2 deletions.
10 changes: 10 additions & 0 deletions controllers/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -86,3 +88,11 @@ func (a *cassandraAdapter) getServiceType() string {
func (a *cassandraAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *cassandraAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *cassandraAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
10 changes: 10 additions & 0 deletions controllers/clickhouse_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -89,3 +91,11 @@ func (a *clickhouseAdapter) getServiceType() string {
func (a *clickhouseAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *clickhouseAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *clickhouseAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
13 changes: 12 additions & 1 deletion controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,7 +44,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
}
}

_, err = avn.Services.Get(ctx, spec.Project, ometa.Name)
oldService, err := avnGen.ServiceGet(ctx, spec.Project, ometa.Name)
exists := err == nil
if !exists && !aiven.IsNotFound(err) {
return fmt.Errorf("failed to fetch service: %w", err)
Expand Down Expand Up @@ -96,6 +97,14 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return err
}

// Check that upgrade task is needed
if o.requiresUpgradeTask(oldService) {
err = o.performUpgradeTask(ctx, avn, avnGen, oldService)
if err != nil {
return err
}
}

req := aiven.UpdateServiceRequest{
Cloud: spec.CloudName,
DiskSpaceMB: v1alpha1.ConvertDiscSpace(o.getDiskSpace()),
Expand Down Expand Up @@ -238,4 +247,6 @@ type serviceAdapter interface {
getDiskSpace() string
getUserConfig() any
newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error)
requiresUpgradeTask(old *service.ServiceGetOut) bool
performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error
}
10 changes: 10 additions & 0 deletions controllers/grafana_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strings"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -86,3 +88,11 @@ func (a *grafanaAdapter) getServiceType() string {
func (a *grafanaAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *grafanaAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *grafanaAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
10 changes: 10 additions & 0 deletions controllers/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"strconv"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -121,3 +123,11 @@ func (a *kafkaAdapter) getServiceType() string {
func (a *kafkaAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *kafkaAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *kafkaAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
10 changes: 10 additions & 0 deletions controllers/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -78,3 +80,11 @@ func (a *kafkaConnectAdapter) getDiskSpace() string {
func (a *kafkaConnectAdapter) GetConnInfoSecretTarget() v1alpha1.ConnInfoSecretTarget {
return v1alpha1.ConnInfoSecretTarget{}
}

func (a *kafkaConnectAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *kafkaConnectAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
10 changes: 10 additions & 0 deletions controllers/mysql_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -87,3 +89,11 @@ func (a *mySQLAdapter) getServiceType() string {
func (a *mySQLAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *mySQLAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *mySQLAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
10 changes: 10 additions & 0 deletions controllers/opensearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -91,3 +93,11 @@ func (a *opensearchAdapter) getServiceType() string {
func (a *opensearchAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *opensearchAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *opensearchAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
60 changes: 59 additions & 1 deletion controllers/postgresql_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aiven/aiven-operator/api/v1alpha1"
pguserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/pg"
)

// PostgreSQLReconciler reconciles a PostgreSQL object
type PostgreSQLReconciler struct {
Controller
}

const waitForTaskToCompleteInterval = time.Second * 10

//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/finalizers,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -61,7 +67,7 @@ func (a *postgreSQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *postgreSQLAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *postgreSQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand Down Expand Up @@ -94,3 +100,55 @@ func (a *postgreSQLAdapter) getServiceType() string {
func (a *postgreSQLAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *postgreSQLAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
var version string = *a.getUserConfig().(pguserconfig.PgUserConfig).PgVersion
return version != old.UserConfig["pg_version"]
}

func (a *postgreSQLAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
var targetVersion string = *a.getUserConfig().(pguserconfig.PgUserConfig).PgVersion
task, err := avnGen.ServiceTaskCreate(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, &service.ServiceTaskCreateIn{
TargetVersion: service.TargetVersionType(targetVersion),
TaskType: service.TaskType("upgrade_check"),
})
if err != nil {
return fmt.Errorf("cannot create PG upgrade check task: %q", err)
}

taskGet, err := waitForTaskToComplete(ctx, func() (*aiven.ServiceTask, error) {
t, getErr := avn.ServiceTask.Get(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, task.TaskId)
if getErr != nil {
return nil, fmt.Errorf("error fetching service task %s: %q", t.Task.Id, getErr)
}

if t.Task.Success == nil {
return nil, nil
}
return &t.Task, nil
})
if err != nil {
return err
}

if !*taskGet.Success {
return fmt.Errorf("PG service upgrade check error, version upgrade from %s to %s, result: %s", old.UserConfig["pg_version"], targetVersion, task.Result)
}
return nil
}

func waitForTaskToComplete[T any](ctx context.Context, f func() (*T, error)) (*T, error) {
var err error
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context timeout while retrying operation, error=%q", err)
case <-time.After(waitForTaskToCompleteInterval):
val, err := f()
if val == nil {
continue
}
return val, err
}
}
}
10 changes: 10 additions & 0 deletions controllers/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -93,3 +95,11 @@ func (a *redisAdapter) getServiceType() string {
func (a *redisAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *redisAdapter) requiresUpgradeTask(old *service.ServiceGetOut) bool {
return false
}

func (a *redisAdapter) performUpgradeTask(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *service.ServiceGetOut) error {
return nil
}
75 changes: 75 additions & 0 deletions tests/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"fmt"
"testing"

"github.com/aiven/aiven-go-client/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/types"

"github.com/aiven/aiven-operator/api/v1alpha1"
)
Expand Down Expand Up @@ -247,3 +249,76 @@ func TestPgCustomPrefix(t *testing.T) {
assert.NotEmpty(t, secret.Data["MY_PG_DATABASE_URI"])
assert.NotEmpty(t, secret.Data["MY_PG_CA_CERT"])
}

func getPgUpgradeVersionYaml(project, pgName, cloudName, version string) string {
return fmt.Sprintf(`
apiVersion: aiven.io/v1alpha1
kind: PostgreSQL
metadata:
name: %[2]s
spec:
authSecretRef:
name: aiven-token
key: token
project: %[1]s
cloudName: %[3]s
plan: startup-4
userConfig:
pg_version: "%[4]s"
`, project, pgName, cloudName, version)
}

func TestPgUpgradeVersion(t *testing.T) {
t.Parallel()
defer recoverPanic(t)

// GIVEN
ctx := context.Background()
pgName := randName("secret-prefix")
yaml := getPgUpgradeVersionYaml(testProject, pgName, testPrimaryCloudName, "14")
s := NewSession(k8sClient, avnClient, testProject)

// Cleans test afterwards
defer s.Destroy()

// WHEN
// Applies given manifest
require.NoError(t, s.Apply(yaml))

// Waits kube objects
pg := new(v1alpha1.PostgreSQL)
require.NoError(t, s.GetRunning(pg, pgName))

// THEN
// Validates instance
pgAvn, err := avnClient.Services.Get(ctx, testProject, pgName)
require.NoError(t, err)
// Tests non-strict yaml. By sending string-integer we expect it's parsed as a string.
// Default version is 15, we get 14, as we set it.
assert.Equal(t, "14", pgAvn.UserConfig["pg_version"])
assert.Equal(t, anyPointer("14"), pg.Spec.UserConfig.PgVersion)

// UserConfig test
require.NotNil(t, pg.Spec.UserConfig)
assert.NotNil(t, pgAvn.UserConfig) // "Aiven instance has defaults set"

updatedYaml := getPgUpgradeVersionYaml(testProject, pgName, testPrimaryCloudName, "16")
require.NoError(t, s.Apply(updatedYaml))

// Verify that the service was upgraded successfully
var pgAvnUpd *aiven.Service
require.NoError(t, retryForever(ctx, "check that PG version was upgraded", func() (bool, error) {
pgAvnUpd, err = avnClient.Services.Get(ctx, testProject, pgName)
if err != nil {
return false, err
}

// Just waits object being updated in Aiven
return pgAvnUpd.UserConfig["pg_version"] != "14", nil
}))

// Gets kube object
pgUpd := new(v1alpha1.PostgreSQL)
require.NoError(t, k8sClient.Get(ctx, types.NamespacedName{Name: pgName, Namespace: "default"}, pgUpd))
assert.Equal(t, "16", *pgUpd.Spec.UserConfig.PgVersion)
}

0 comments on commit 969cb5a

Please sign in to comment.