Skip to content

Commit

Permalink
feat: use generated client in service integrations controller (#629)
Browse files Browse the repository at this point in the history
  • Loading branch information
rriski authored Feb 12, 2024
1 parent 8942de5 commit 017d5c5
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 97 deletions.
40 changes: 26 additions & 14 deletions controllers/basic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -39,27 +40,28 @@ type (
Scheme *runtime.Scheme
Recorder record.EventRecorder
DefaultToken string
AvGenClient avngen.Client
}

// Handlers represents Aiven API handlers
// It intended to be a layer between Kubernetes and Aiven API that handles all aspects
// of the Aiven services lifecycle.
Handlers interface {
// create or updates an instance on the Aiven side.
createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error
createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error

// delete removes an instance on Aiven side.
// If an object is already deleted and cannot be found, it should not be an error. For other deletion
// errors, return an error.
delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error)
delete(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error)

// get retrieve an object and a secret (for example, connection credentials) that is generated on the
// fly based on data from Aiven API. When not applicable to service, it should return nil.
get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error)
get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error)

// checkPreconditions check whether all preconditions for creating (or updating) the resource are in place.
// For example, it is applicable when a service needs to be running before this resource can be created.
checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error)
checkPreconditions(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error)
}

// refsObject returns references to dependent resources
Expand Down Expand Up @@ -127,13 +129,20 @@ func (c *Controller) reconcileInstance(ctx context.Context, req ctrl.Request, h
return ctrl.Result{}, fmt.Errorf("cannot initialize aiven client: %w", err)
}

avnGen, err := NewAivenGeneratedClient(token)
if err != nil {
c.Recorder.Event(o, corev1.EventTypeWarning, eventUnableToCreateClient, err.Error())
return ctrl.Result{}, fmt.Errorf("cannot initialize aiven generated client: %w", err)
}

helper := instanceReconcilerHelper{
avn: avn,
k8s: c.Client,
h: h,
log: instanceLogger,
s: clientAuthSecret,
rec: c.Recorder,
avn: avn,
avnGen: avnGen,
k8s: c.Client,
h: h,
log: instanceLogger,
s: clientAuthSecret,
rec: c.Recorder,
}

requeue, err := helper.reconcile(ctx, o)
Expand All @@ -148,6 +157,9 @@ type instanceReconcilerHelper struct {
// avn, Aiven client that is authorized with the instance token
avn *aiven.Client

// avnGen, Aiven client that is authorized with the instance token
avnGen avngen.Client

// h, instance specific handler implementation
h Handlers

Expand Down Expand Up @@ -286,7 +298,7 @@ func (i *instanceReconcilerHelper) checkPreconditions(ctx context.Context, o cli
i.log.Info("all references are good")
}

check, err := i.h.checkPreconditions(ctx, i.avn, o)
check, err := i.h.checkPreconditions(ctx, i.avn, i.avnGen, o)
if err != nil {
i.rec.Event(o, corev1.EventTypeWarning, eventUnableToWaitForPreconditions, err.Error())
return false, fmt.Errorf("unable to wait for preconditions: %w", err)
Expand Down Expand Up @@ -357,7 +369,7 @@ func (i *instanceReconcilerHelper) finalize(ctx context.Context, o v1alpha1.Aive
}

if deletionPolicy == deletionPolicyDelete {
finalised, err = i.h.delete(ctx, i.avn, o)
finalised, err = i.h.delete(ctx, i.avn, i.avnGen, o)
if err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionDelete, err))
}
Expand Down Expand Up @@ -423,7 +435,7 @@ func (i *instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o
delete(a, processedGenerationAnnotation)
delete(a, instanceIsRunningAnnotation)

if err := i.h.createOrUpdate(ctx, i.avn, o, refs); err != nil {
if err := i.h.createOrUpdate(ctx, i.avn, i.avnGen, o, refs); err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionCreateOrUpdate, err))
return fmt.Errorf("unable to create or update aiven instance: %w", err)
}
Expand All @@ -441,7 +453,7 @@ func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx

// Needs to be before o.NoSecret() check because `get` mutates the object's metadata annotations.
// It set the instanceIsRunningAnnotation annotation when the instance is running on Aiven's side.
secret, err := i.h.get(ctx, i.avn, o)
secret, err := i.h.get(ctx, i.avn, i.avnGen, o)
if secret == nil || err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions controllers/clickhouseuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -42,7 +43,7 @@ func (r *ClickhouseUserReconciler) SetupWithManager(mgr ctrl.Manager) error {

type clickhouseUserHandler struct{}

func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error {
func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
user, err := h.convert(obj)
if err != nil {
return err
Expand All @@ -69,7 +70,7 @@ func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return nil
}

func (h *clickhouseUserHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h *clickhouseUserHandler) delete(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
user, err := h.convert(obj)
if err != nil {
return false, err
Expand All @@ -88,7 +89,7 @@ func (h *clickhouseUserHandler) delete(ctx context.Context, avn *aiven.Client, o
return true, nil
}

func (h *clickhouseUserHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) {
func (h *clickhouseUserHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
user, err := h.convert(obj)
if err != nil {
return nil, err
Expand Down Expand Up @@ -132,7 +133,7 @@ func (h *clickhouseUserHandler) get(ctx context.Context, avn *aiven.Client, obj
return secret, nil
}

func (h *clickhouseUserHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h *clickhouseUserHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
user, err := h.convert(obj)
if err != nil {
return false, err
Expand All @@ -141,7 +142,7 @@ func (h *clickhouseUserHandler) checkPreconditions(ctx context.Context, avn *aiv
meta.SetStatusCondition(&user.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, user.Spec.Project, user.Spec.ServiceName)
return checkServiceIsRunning(ctx, avn, avnGen, user.Spec.Project, user.Spec.ServiceName)
}

func (h *clickhouseUserHandler) convert(i client.Object) (*v1alpha1.ClickhouseUser, error) {
Expand Down
12 changes: 9 additions & 3 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/liip/sheriff"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -48,8 +49,8 @@ var (
errTerminationProtectionOn = errors.New("termination protection is on")
)

func checkServiceIsRunning(ctx context.Context, c *aiven.Client, project, serviceName string) (bool, error) {
s, err := c.Services.Get(ctx, project, serviceName)
func checkServiceIsRunning(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, project, serviceName string) (bool, error) {
s, err := avnGen.ServiceGet(ctx, project, serviceName)
if err != nil {
// if service is not found, it is not running
if aiven.IsNotFound(err) {
Expand Down Expand Up @@ -125,11 +126,16 @@ func isAivenServerError(err error) bool {
return ok && e.Status >= http.StatusInternalServerError
}

// NewAivenClient returns Aiven client
// NewAivenClient returns Aiven client (aiven/aiven-go-client/v2)
func NewAivenClient(token string) (*aiven.Client, error) {
return aiven.NewTokenClient(token, "k8s-operator/"+version)
}

// NewAivenGeneratedClient returns Aiven generated client client (aiven/go-client-codegen)
func NewAivenGeneratedClient(token string) (avngen.Client, error) {
return avngen.NewClient(avngen.TokenOpt(token), avngen.UserAgentOpt("k8s-operator/"+version))
}

func fromAnyPointer[T any](v *T) T {
if v != nil {
return *v
Expand Down
11 changes: 6 additions & 5 deletions controllers/connectionpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -41,7 +42,7 @@ func (r *ConnectionPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, _ []client.Object) error {
func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
cp, err := h.convert(obj)
if err != nil {
return err
Expand Down Expand Up @@ -93,7 +94,7 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl
return nil
}

func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
cp, err := h.convert(obj)
if err != nil {
return false, err
Expand All @@ -119,7 +120,7 @@ func (h ConnectionPoolHandler) exists(ctx context.Context, avn *aiven.Client, cp
return conPool != nil, nil
}

func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) {
func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
connPool, err := h.convert(obj)
if err != nil {
return nil, err
Expand Down Expand Up @@ -198,7 +199,7 @@ func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, obj c
return newSecret(connPool, stringData, false), nil
}

func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
cp, err := h.convert(obj)
if err != nil {
return false, err
Expand All @@ -207,7 +208,7 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive
meta.SetStatusCondition(&cp.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

check, err := checkServiceIsRunning(ctx, avn, cp.Spec.Project, cp.Spec.ServiceName)
check, err := checkServiceIsRunning(ctx, avn, avnGen, cp.Spec.Project, cp.Spec.ServiceName)
if err != nil {
return false, err
}
Expand Down
11 changes: 6 additions & 5 deletions controllers/database_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (r *DatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error {
func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
db, err := h.convert(obj)
if err != nil {
return err
Expand Down Expand Up @@ -74,7 +75,7 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
return nil
}

func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
db, err := h.convert(obj)
if err != nil {
return false, err
Expand Down Expand Up @@ -105,7 +106,7 @@ func (h DatabaseHandler) exists(ctx context.Context, avn *aiven.Client, db *v1al
return d != nil, nil
}

func (h DatabaseHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) {
func (h DatabaseHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
db, err := h.convert(obj)
if err != nil {
return nil, err
Expand All @@ -125,7 +126,7 @@ func (h DatabaseHandler) get(ctx context.Context, avn *aiven.Client, obj client.
return nil, nil
}

func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
db, err := h.convert(obj)
if err != nil {
return false, err
Expand All @@ -134,7 +135,7 @@ func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&db.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName)
return checkServiceIsRunning(ctx, avn, avnGen, db.Spec.Project, db.Spec.ServiceName)
}

func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) {
Expand Down
11 changes: 6 additions & 5 deletions controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strconv"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -24,7 +25,7 @@ type genericServiceHandler struct {
fabric serviceAdapterFabric
}

func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, obj client.Object, refs []client.Object) error {
func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
o, err := h.fabric(avn, obj)
if err != nil {
return err
Expand Down Expand Up @@ -140,7 +141,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return nil
}

func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
o, err := h.fabric(avn, obj)
if err != nil {
return false, err
Expand All @@ -159,7 +160,7 @@ func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, o
return false, fmt.Errorf("failed to delete service in Aiven: %w", err)
}

func (h *genericServiceHandler) get(ctx context.Context, avn *aiven.Client, obj client.Object) (*corev1.Secret, error) {
func (h *genericServiceHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
o, err := h.fabric(avn, obj)
if err != nil {
return nil, err
Expand All @@ -186,7 +187,7 @@ func (h *genericServiceHandler) get(ctx context.Context, avn *aiven.Client, obj
}

// checkPreconditions not required for now by services to be implemented
func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) {
func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
o, err := h.fabric(avn, obj)
if err != nil {
return false, err
Expand All @@ -197,7 +198,7 @@ func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiv
// Validates that read_replica is running
// If not, the wrapper controller will try later
if s.IntegrationType == "read_replica" {
r, err := checkServiceIsRunning(ctx, avn, spec.Project, s.SourceServiceName)
r, err := checkServiceIsRunning(ctx, avn, avnGen, spec.Project, s.SourceServiceName)
if !r || err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 017d5c5

Please sign in to comment.