Skip to content

Commit

Permalink
feat: check PG upgrade task before attempting to upgrade service
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-held-aiven committed Feb 27, 2024
1 parent bbddf24 commit 5a88f8b
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Perform upgrade tasks to check if PG service can be upgraded before updating service
- Expose project CA certificate to service secrets: `REDIS_CA_CERT`, `MYSQL_CA_CERT`, etc.
- Add `KafkaTopic` field `config.local_retention_bytes`, type `integer`: local.retention.bytes value
- Add `KafkaTopic` field `config.local_retention_ms`, type `integer`: local.retention.ms value
Expand Down
7 changes: 6 additions & 1 deletion controllers/cassandra_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (a *cassandraAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *cassandraAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *cassandraAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -86,3 +87,7 @@ func (a *cassandraAdapter) getServiceType() string {
func (a *cassandraAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *cassandraAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
7 changes: 6 additions & 1 deletion controllers/clickhouse_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (a *clickhouseAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *clickhouseAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *clickhouseAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -89,3 +90,7 @@ func (a *clickhouseAdapter) getServiceType() string {
func (a *clickhouseAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *clickhouseAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
9 changes: 8 additions & 1 deletion controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
}
}

_, err = avn.Services.Get(ctx, spec.Project, ometa.Name)
oldService, err := avn.Services.Get(ctx, spec.Project, ometa.Name)
exists := err == nil
if !exists && !aiven.IsNotFound(err) {
return fmt.Errorf("failed to fetch service: %w", err)
Expand Down Expand Up @@ -96,6 +96,12 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return err
}

// Perform upgrade task if necessary (at the moment, this is relevant only for PostgreSQL)
err = o.performUpgradeTaskIfNeeded(ctx, avn, avnGen, oldService)
if err != nil {
return err
}

req := aiven.UpdateServiceRequest{
Cloud: spec.CloudName,
DiskSpaceMB: v1alpha1.ConvertDiscSpace(o.getDiskSpace()),
Expand Down Expand Up @@ -238,4 +244,5 @@ type serviceAdapter interface {
getDiskSpace() string
getUserConfig() any
newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error)
performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error
}
7 changes: 6 additions & 1 deletion controllers/grafana_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (a *grafanaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *grafanaAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *grafanaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -86,3 +87,7 @@ func (a *grafanaAdapter) getServiceType() string {
func (a *grafanaAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *grafanaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
7 changes: 6 additions & 1 deletion controllers/kafka_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (a *kafkaAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *kafkaAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *kafkaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand Down Expand Up @@ -121,3 +122,7 @@ func (a *kafkaAdapter) getServiceType() string {
func (a *kafkaAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *kafkaAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
7 changes: 6 additions & 1 deletion controllers/kafkaconnect_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (a *kafkaConnectAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec
}

func (a *kafkaConnectAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *kafkaConnectAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -78,3 +79,7 @@ func (a *kafkaConnectAdapter) getDiskSpace() string {
func (a *kafkaConnectAdapter) GetConnInfoSecretTarget() v1alpha1.ConnInfoSecretTarget {
return v1alpha1.ConnInfoSecretTarget{}
}

func (a *kafkaConnectAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
7 changes: 6 additions & 1 deletion controllers/mysql_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (a *mySQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *mySQLAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *mySQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -87,3 +88,7 @@ func (a *mySQLAdapter) getServiceType() string {
func (a *mySQLAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *mySQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
7 changes: 6 additions & 1 deletion controllers/opensearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (a *opensearchAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *opensearchAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *opensearchAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand All @@ -91,3 +92,7 @@ func (a *opensearchAdapter) getServiceType() string {
func (a *opensearchAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *opensearchAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
60 changes: 59 additions & 1 deletion controllers/postgresql_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,27 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aiven/aiven-operator/api/v1alpha1"
pguserconfig "github.com/aiven/aiven-operator/api/v1alpha1/userconfig/service/pg"
)

// PostgreSQLReconciler reconciles a PostgreSQL object
type PostgreSQLReconciler struct {
Controller
}

const waitForTaskToCompleteInterval = time.Second * 10

//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=aiven.io,resources=postgresqls/finalizers,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -61,7 +67,7 @@ func (a *postgreSQLAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *postgreSQLAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *postgreSQLAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand Down Expand Up @@ -94,3 +100,55 @@ func (a *postgreSQLAdapter) getServiceType() string {
func (a *postgreSQLAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *postgreSQLAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
var currentVersion string = old.UserConfig["pg_version"].(string)
var targetVersion string = *a.getUserConfig().(*pguserconfig.PgUserConfig).PgVersion

// No need to upgrade if pg_version hasn't changed
if targetVersion == currentVersion {
return nil
}

task, err := avnGen.ServiceTaskCreate(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, &service.ServiceTaskCreateIn{
TargetVersion: service.TargetVersionType(targetVersion),
TaskType: service.TaskTypeUpgradeCheck,
})
if err != nil {
return fmt.Errorf("cannot create PG upgrade check task: %q", err)
}

finalTaskResult, err := waitForTaskToComplete(ctx, func() (bool, *aiven.ServiceTask, error) {
t, getErr := avn.ServiceTask.Get(ctx, a.getServiceCommonSpec().Project, a.getObjectMeta().Name, task.TaskId)
if getErr != nil {
return true, nil, fmt.Errorf("error fetching service task %s: %q", t.Task.Id, getErr)
}

if t.Task.Success == nil {
return false, nil, nil
}
return true, &t.Task, nil
})
if err != nil {
return err
}
if !*finalTaskResult.Success {
return fmt.Errorf("PG service upgrade check error, version upgrade from %s to %s, result: %s", currentVersion, targetVersion, finalTaskResult.Result)
}
return nil
}

func waitForTaskToComplete[T any](ctx context.Context, f func() (bool, *T, error)) (*T, error) {
var err error
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context timeout while retrying operation, error=%q", err)
case <-time.After(waitForTaskToCompleteInterval):
finished, val, err := f()
if finished {
return val, err
}
}
}
}
7 changes: 6 additions & 1 deletion controllers/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (a *redisAdapter) getServiceCommonSpec() *v1alpha1.ServiceCommonSpec {
}

func (a *redisAdapter) getUserConfig() any {
return &a.Spec.UserConfig
return a.Spec.UserConfig
}

func (a *redisAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1.Secret, error) {
Expand Down Expand Up @@ -93,3 +94,7 @@ func (a *redisAdapter) getServiceType() string {
func (a *redisAdapter) getDiskSpace() string {
return a.Spec.DiskSpace
}

func (a *redisAdapter) performUpgradeTaskIfNeeded(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, old *aiven.Service) error {
return nil
}
Loading

0 comments on commit 5a88f8b

Please sign in to comment.