Skip to content

Commit

Permalink
refactor(controllers): set conditions on top level
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Nov 21, 2023
1 parent 4e7f8ac commit f244b5e
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 91 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Set conditions on errors: `Preconditions`, `CreateOrUpdate`, `Delete`. Thanks to @atarax

## v0.15.0 - 2023-11-17

- Upgrade to Go 1.21
Expand Down
54 changes: 28 additions & 26 deletions controllers/basic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -126,14 +127,31 @@ func (c *Controller) reconcileInstance(ctx context.Context, req ctrl.Request, h
return ctrl.Result{}, fmt.Errorf("cannot initialize aiven client: %w", err)
}

return instanceReconcilerHelper{
result, err := instanceReconcilerHelper{
avn: avn,
k8s: c.Client,
h: h,
log: instanceLogger,
s: clientAuthSecret,
rec: c.Recorder,
}.reconcileInstance(ctx, o)

// Order matters.
// First need to update the object, and then update the status.
// So dependent resources won't see READY before it has been updated with new values
// Clone is used so update won't overwrite in-memory values
// We need to this on outer level because we otherwise wouldn't set status-conditions
// properly when preconditions fail
clone := o.DeepCopyObject().(client.Object)
err = multierror.Append(err, c.Client.Update(ctx, clone))

// The original object has been updated
o.SetResourceVersion(clone.GetResourceVersion())

// It's ready to cast its status
err = multierror.Append(err, c.Client.Status().Update(ctx, o))
err = err.(*multierror.Error).ErrorOrNil()
return result, err
}

// a helper that closes over all instance specific fields
Expand All @@ -157,30 +175,10 @@ type instanceReconcilerHelper struct {
rec record.EventRecorder
}

func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o client.Object) (ctrl.Result, error) {
func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o v1alpha1.AivenManagedObject) (ctrl.Result, error) {
i.log.Info("reconciling instance")
i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation")

var err error

defer func() {
// Order matters.
// First need to update the object, and then update the status.
// So dependent resources won't see READY before it has been updated with new values
// Clone is used so update won't overwrite in-memory values
// We need to this on outer level because we otherwise wouldn't set status-conditions
// properly when preconditions fail
clone := o.DeepCopyObject().(client.Object)
err = multierror.Append(err, i.k8s.Update(ctx, clone))

// Original object has been updated
o.SetResourceVersion(clone.GetResourceVersion())

// It's ready to cast its status
err = multierror.Append(err, i.k8s.Status().Update(ctx, o))
err = err.(*multierror.Error).ErrorOrNil()
}()

if isMarkedForDeletion(o) {
if controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) {
return i.finalize(ctx, o)
Expand Down Expand Up @@ -217,11 +215,11 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien

requeue, err := i.checkPreconditions(ctx, o, refs)
if requeue {
// It must be possible to return requeue and error by design.
// By the time this comment created, there is no such case in checkPreconditions()
return ctrl.Result{Requeue: true, RequeueAfter: requeueTimeout}, err
}

if err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionPreconditions, err))
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -330,7 +328,7 @@ func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Ob
// finalize runs finalization logic. If the finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation. When applicable, it retrieves an associated object that
// has to be deleted from Kubernetes, and it could be a secret associated with an instance.
func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) (ctrl.Result, error) {
func (i instanceReconcilerHelper) finalize(ctx context.Context, o v1alpha1.AivenManagedObject) (ctrl.Result, error) {
i.rec.Event(o, corev1.EventTypeNormal, eventTryingToDeleteAtAiven, "trying to delete instance at aiven")

var err error
Expand All @@ -349,6 +347,9 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object)

if deletionPolicy == deletionPolicyDelete {
finalised, err = i.h.delete(ctx, i.avn, o)
if err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionDelete, err))
}
}

// There are dependencies on Aiven side, resets error, so it goes for requeue
Expand Down Expand Up @@ -408,13 +409,14 @@ func (i instanceReconcilerHelper) isInvalidTokenError(err error) bool {
return strings.Contains(msg, "Invalid token") || strings.Contains(msg, "Missing (expired) db token")
}

func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o client.Object, refs []client.Object) error {
func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o v1alpha1.AivenManagedObject, refs []client.Object) error {
i.log.Info("generation wasn't processed, creation or updating instance on aiven side")
a := o.GetAnnotations()
delete(a, processedGenerationAnnotation)
delete(a, instanceIsRunningAnnotation)

if err := i.h.createOrUpdate(ctx, i.avn, o, refs); err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionCreateOrUpdate, err))
return fmt.Errorf("unable to create or update aiven instance: %w", err)
}

Expand Down
4 changes: 0 additions & 4 deletions controllers/clickhouseuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return err
}

if err != nil && !aiven.IsAlreadyExists(err) {
return fmt.Errorf("cannot createOrUpdate clickhouse user on aiven side: %w", err)
}

user.Status.UUID = r.User.UUID

meta.SetStatusCondition(&user.Status.Conditions,
Expand Down
12 changes: 10 additions & 2 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ const (
deletionPolicyDelete = "Delete"
)

type errCondition string

const (
errConditionDelete errCondition = "Delete"
errConditionPreconditions errCondition = "Preconditions"
errConditionCreateOrUpdate errCondition = "CreateOrUpdate"
)

var (
version = "dev"
errTerminationProtectionOn = errors.New("termination protection is on")
Expand Down Expand Up @@ -71,11 +79,11 @@ func getRunningCondition(status metav1.ConditionStatus, reason, message string)
}
}

func getErrorCondition(reason string, err error) metav1.Condition {
func getErrorCondition(reason errCondition, err error) metav1.Condition {
return metav1.Condition{
Type: conditionTypeError,
Status: metav1.ConditionUnknown,
Reason: reason,
Reason: string(reason),
Message: err.Error(),
}
}
Expand Down
5 changes: 0 additions & 5 deletions controllers/connectionpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl

exists, err := h.exists(ctx, avn, cp)
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("CheckExists", err))
return err
}
var reason string
Expand All @@ -62,7 +61,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl
Username: optionalStringPointer(cp.Spec.Username),
})
if err != nil && !aiven.IsAlreadyExists(err) {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Create", err))
return err
}
reason = "Created"
Expand All @@ -75,7 +73,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl
Username: optionalStringPointer(cp.Spec.Username),
})
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Update", err))
return err
}
reason = "Updated"
Expand Down Expand Up @@ -103,7 +100,6 @@ func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, ob

err = avn.ConnectionPools.Delete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name)
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Delete", err))
return false, err
}

Expand Down Expand Up @@ -204,7 +200,6 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive

check, err := checkServiceIsRunning(ctx, avn, cp.Spec.Project, cp.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Preconditions", err))
return false, err
}

Expand Down
9 changes: 1 addition & 8 deletions controllers/database_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
exists, err := h.exists(ctx, avn, db)

if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CheckExists", err))
return err
}

Expand All @@ -58,7 +57,6 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
LcType: db.Spec.LcCtype,
})
if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CreateOrUpdate", err))
return fmt.Errorf("cannot create database on Aiven side: %w", err)
}
}
Expand Down Expand Up @@ -93,7 +91,6 @@ func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, obj clie
db.Spec.ServiceName,
db.Name)
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Delete", err))
return false, err
}

Expand Down Expand Up @@ -138,11 +135,7 @@ func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&db.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

running, err := checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Preconditions", err))
}
return running, err
return checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName)
}

func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) {
Expand Down
13 changes: 2 additions & 11 deletions controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
_, err = avn.Services.Get(ctx, spec.Project, ometa.Name)
exists := err == nil
if !exists && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("CheckExists", err))
return fmt.Errorf("failed to fetch service: %w", err)
}

Expand Down Expand Up @@ -81,14 +80,12 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C

_, err = avn.Services.Create(ctx, spec.Project, req)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Create", err))
return fmt.Errorf("failed to create service: %w", err)
}
} else {
reason = "Updated"
userConfig, err := UpdateUserConfiguration(o.getUserConfig())
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err))
return err
}

Expand All @@ -104,7 +101,6 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
}
_, err = avn.Services.Update(ctx, spec.Project, ometa.Name, req)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err))
return fmt.Errorf("failed to update service: %w", err)
}
}
Expand All @@ -120,7 +116,6 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
}
_, err = avn.ServiceTags.Set(ctx, spec.Project, ometa.Name, req)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err))
return fmt.Errorf("failed to update tags: %w", err)
}

Expand Down Expand Up @@ -154,7 +149,6 @@ func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, o
return true, nil
}

meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Delete", err))
return false, fmt.Errorf("failed to delete service in Aiven: %w", err)
}

Expand Down Expand Up @@ -197,11 +191,8 @@ func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiv
// If not, the wrapper controller will try later
if s.IntegrationType == "read_replica" {
r, err := checkServiceIsRunning(ctx, avn, spec.Project, s.SourceServiceName)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Preconditions", err))
}
if !(r && err == nil) {
return false, nil
if !r || err != nil {
return false, err
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions controllers/kafkaacl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (h KafkaACLHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
// Tries to delete it instead
_, err = h.delete(ctx, avn, obj)
if err != nil {
meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err))
return err
}

Expand Down Expand Up @@ -161,11 +160,7 @@ func (h KafkaACLHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&acl.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

running, err := checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err))
}
return running, err
return checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName)
}

func (h KafkaACLHandler) convert(i client.Object) (*v1alpha1.KafkaACL, error) {
Expand Down
Loading

0 comments on commit f244b5e

Please sign in to comment.