Skip to content

Commit

Permalink
fix(controllers): object not committed properly
Browse files Browse the repository at this point in the history
  • Loading branch information
dependabot[bot] authored and byashimov committed Nov 30, 2023
1 parent aecc4ee commit b5207b5
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 150 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Set conditions on errors: `Preconditions`, `CreateOrUpdate`, `Delete`. Thanks to @atarax
- Fix object updates lost when reconciler exits before the object is committed
- Add `Kafka` field `userConfig.kafka.transaction_partition_verification_enable`, type `boolean`: Enable
verification that checks that the partition has been added to the transaction before writing transactional
records to the partition
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ boilerplate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto,
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."

.PHONY: generate
generate: boilerplate userconfigs imports manifests docs charts
generate: userconfigs boilerplate imports manifests docs charts

.PHONY: fmt
fmt: ## Run go fmt against code.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

164 changes: 83 additions & 81 deletions controllers/basic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package controllers

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/aiven/aiven-go-client/v2"
"github.com/go-logr/logr"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -80,7 +81,6 @@ const (
eventUnableToDelete = "UnableToDelete"
eventSuccessfullyDeletedAtAiven = "SuccessfullyDeletedAtAiven"
eventAddedFinalizer = "InstanceFinalizerAdded"
eventUnableToAddFinalizer = "UnableToAddFinalizer"
eventWaitingForPreconditions = "WaitingForPreconditions"
eventUnableToWaitForPreconditions = "UnableToWaitForPreconditions"
eventPreconditionsAreMet = "PreconditionsAreMet"
Expand Down Expand Up @@ -126,14 +126,17 @@ func (c *Controller) reconcileInstance(ctx context.Context, req ctrl.Request, h
return ctrl.Result{}, fmt.Errorf("cannot initialize aiven client: %w", err)
}

return instanceReconcilerHelper{
helper := instanceReconcilerHelper{
avn: avn,
k8s: c.Client,
h: h,
log: instanceLogger,
s: clientAuthSecret,
rec: c.Recorder,
}.reconcileInstance(ctx, o)
}

requeue, err := helper.reconcile(ctx, o)
return ctrl.Result{Requeue: requeue, RequeueAfter: requeueTimeout}, err
}

// a helper that closes over all instance specific fields
Expand All @@ -157,53 +160,65 @@ type instanceReconcilerHelper struct {
rec record.EventRecorder
}

func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o client.Object) (ctrl.Result, error) {
i.log.Info("reconciling instance")
i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation")

var err error

defer func() {
// Order matters.
// First need to update the object, and then update the status.
// So dependent resources won't see READY before it has been updated with new values
// Clone is used so update won't overwrite in-memory values
// We need to this on outer level because we otherwise wouldn't set status-conditions
// properly when preconditions fail
clone := o.DeepCopyObject().(client.Object)
err = multierror.Append(err, i.k8s.Update(ctx, clone))

// Original object has been updated
o.SetResourceVersion(clone.GetResourceVersion())

// It's ready to cast its status
err = multierror.Append(err, i.k8s.Status().Update(ctx, o))
err = err.(*multierror.Error).ErrorOrNil()
}()

func (i *instanceReconcilerHelper) reconcile(ctx context.Context, o v1alpha1.AivenManagedObject) (bool, error) {
// Deletion
if isMarkedForDeletion(o) {
if controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) {
return i.finalize(ctx, o)
}
return ctrl.Result{}, nil
return false, nil
}

// Create or update.
// Even if reconcile fails, we need to update the object in kube
// to save conditions and other data.
// So we don't exit on error.
orig := o.DeepCopyObject().(v1alpha1.AivenManagedObject)
requeue, err := i.reconcileInstance(ctx, o)
if equality.Semantic.DeepEqual(orig, o) {
return requeue, err
}

// Order matters.
// First need to update the object, and then update the status.
// So dependent resources won't see READY before it has been updated with new values

// When updated, object status is vanished.
// So we waste a copy for that,
// while the original object must have all the fields updated in runtime
clone := o.DeepCopyObject().(client.Object)
errUpdate := i.k8s.Update(ctx, clone)

// Now we can update the status
o.SetResourceVersion(clone.GetResourceVersion())
errStatus := i.k8s.Status().Update(ctx, o)
errMerged := errors.Join(err, errUpdate, errStatus)
if errMerged != nil {
return true, errMerged
}

return requeue, nil
}

func (i *instanceReconcilerHelper) reconcileInstance(ctx context.Context, o v1alpha1.AivenManagedObject) (bool, error) {
i.log.Info("reconciling instance")
i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation")

// Add finalizers to an instance and associated secret, only if they haven't
// been added in the previous reconciliation loops
if i.s != nil {
if !controllerutil.ContainsFinalizer(i.s, secretProtectionFinalizer) {
i.log.Info("adding finalizer to secret")
if err := addFinalizer(ctx, i.k8s, i.s, secretProtectionFinalizer); err != nil {
return ctrl.Result{}, fmt.Errorf("unable to add finalizer to secret: %w", err)
return false, fmt.Errorf("unable to add finalizer to secret: %w", err)
}
}
}

if !controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) {
// Adds finalizer. The commit is performed in the outer function
i.log.Info("adding finalizer to instance")
if err := addFinalizer(ctx, i.k8s, o, instanceDeletionFinalizer); err != nil {
i.rec.Eventf(o, corev1.EventTypeWarning, eventUnableToAddFinalizer, err.Error())
return ctrl.Result{}, fmt.Errorf("unable to add finalizer to instance: %w", err)
}
controllerutil.AddFinalizer(o, instanceDeletionFinalizer)
i.rec.Event(o, corev1.EventTypeNormal, eventAddedFinalizer, "instance finalizer added")
}

Expand All @@ -212,58 +227,51 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien
refs, err := i.getObjectRefs(ctx, o)
if err != nil {
i.log.Info(fmt.Sprintf("one or more references can't be found yet: %s", err))
return ctrl.Result{Requeue: true, RequeueAfter: requeueTimeout}, nil
return true, nil
}

requeue, err := i.checkPreconditions(ctx, o, refs)
if requeue {
// It must be possible to return requeue and error by design.
// By the time this comment created, there is no such case in checkPreconditions()
return ctrl.Result{Requeue: true, RequeueAfter: requeueTimeout}, err
return true, err
}

if err != nil {
return ctrl.Result{}, err
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionPreconditions, err))
return false, err
}

if !isAlreadyProcessed(o) {
i.rec.Event(o, corev1.EventTypeNormal, eventCreateOrUpdatedAtAiven, "about to create instance at aiven")
if err := i.createOrUpdateInstance(ctx, o, refs); err != nil {
i.rec.Event(o, corev1.EventTypeWarning, eventUnableToCreateOrUpdateAtAiven, err.Error())
return ctrl.Result{}, fmt.Errorf("unable to create or update instance at aiven: %w", err)
return false, fmt.Errorf("unable to create or update instance at aiven: %w", err)
}

i.rec.Event(o, corev1.EventTypeNormal, eventCreatedOrUpdatedAtAiven, "instance was created at aiven but may not be running yet")
}

i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForTheInstanceToBeRunning, "waiting for the instance to be running")
isRunning, err := i.updateInstanceStateAndSecretUntilRunning(ctx, o)
err = i.updateInstanceStateAndSecretUntilRunning(ctx, o)
if err != nil {
if aiven.IsNotFound(err) {
return ctrl.Result{
Requeue: true,
RequeueAfter: requeueTimeout,
}, nil
return true, nil
}

i.rec.Event(o, corev1.EventTypeWarning, eventUnableToWaitForInstanceToBeRunning, err.Error())
return ctrl.Result{}, fmt.Errorf("unable to wait until instance is running: %w", err)
return false, fmt.Errorf("unable to wait until instance is running: %w", err)
}

if !isRunning {
if !IsAlreadyRunning(o) {
i.log.Info("instance is not yet running, triggering requeue")
return ctrl.Result{
Requeue: true,
RequeueAfter: requeueTimeout,
}, nil
return true, nil
}

i.rec.Event(o, corev1.EventTypeNormal, eventInstanceIsRunning, "instance is in a RUNNING state")
i.log.Info("instance was successfully reconciled")

return ctrl.Result{}, nil
return false, nil
}

func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) {
func (i *instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) {
i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForPreconditions, "waiting for preconditions of the instance")

// Checks references
Expand Down Expand Up @@ -293,7 +301,7 @@ func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o clie
return false, nil
}

func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Object) ([]client.Object, error) {
func (i *instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Object) ([]client.Object, error) {
refsObj, ok := o.(refsObject)
if !ok {
return nil, nil
Expand Down Expand Up @@ -330,7 +338,7 @@ func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Ob
// finalize runs finalization logic. If the finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation. When applicable, it retrieves an associated object that
// has to be deleted from Kubernetes, and it could be a secret associated with an instance.
func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) (ctrl.Result, error) {
func (i *instanceReconcilerHelper) finalize(ctx context.Context, o v1alpha1.AivenManagedObject) (bool, error) {
i.rec.Event(o, corev1.EventTypeNormal, eventTryingToDeleteAtAiven, "trying to delete instance at aiven")

var err error
Expand All @@ -349,6 +357,9 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object)

if deletionPolicy == deletionPolicyDelete {
finalised, err = i.h.delete(ctx, i.avn, o)
if err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionDelete, err))
}
}

// There are dependencies on Aiven side, resets error, so it goes for requeue
Expand All @@ -367,24 +378,21 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object)
finalised = true
} else if aiven.IsNotFound(err) {
i.rec.Event(o, corev1.EventTypeWarning, eventUnableToDeleteAtAiven, err.Error())
return ctrl.Result{}, fmt.Errorf("unable to delete instance at aiven: %w", err)
return false, fmt.Errorf("unable to delete instance at aiven: %w", err)
} else if isAivenServerError(err) {
// If failed to delete, retries
i.log.Info(fmt.Sprintf("unable to delete instance at aiven: %s", err))
err = nil
} else {
i.rec.Event(o, corev1.EventTypeWarning, eventUnableToDelete, err.Error())
return ctrl.Result{}, fmt.Errorf("unable to delete instance: %w", err)
return false, fmt.Errorf("unable to delete instance: %w", err)
}
}

// checking if instance was finalized, if not triggering a requeue
if !finalised {
i.log.Info("instance is not yet deleted at aiven, triggering requeue")
return ctrl.Result{
Requeue: true,
RequeueAfter: requeueTimeout,
}, nil
return true, nil
}

i.log.Info("instance was successfully deleted at aiven, removing finalizer")
Expand All @@ -393,28 +401,29 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object)
// remove finalizer, once all finalizers have been removed, the object will be deleted.
if err := removeFinalizer(ctx, i.k8s, o, instanceDeletionFinalizer); err != nil {
i.rec.Event(o, corev1.EventTypeWarning, eventUnableToDeleteFinalizer, err.Error())
return ctrl.Result{}, fmt.Errorf("unable to remove finalizer: %w", err)
return false, fmt.Errorf("unable to remove finalizer: %w", err)
}

i.log.Info("finalizer was removed, instance is deleted")
return ctrl.Result{}, nil
return false, nil
}

// isInvalidTokenError checks if the error is related to invalid token
func (i instanceReconcilerHelper) isInvalidTokenError(err error) bool {
func (i *instanceReconcilerHelper) isInvalidTokenError(err error) bool {
// When an instance was created but pointing to an invalid API token
// and no generation was ever processed, allow deleting such instance
msg := err.Error()
return strings.Contains(msg, "Invalid token") || strings.Contains(msg, "Missing (expired) db token")
}

func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o client.Object, refs []client.Object) error {
func (i *instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o v1alpha1.AivenManagedObject, refs []client.Object) error {
i.log.Info("generation wasn't processed, creation or updating instance on aiven side")
a := o.GetAnnotations()
delete(a, processedGenerationAnnotation)
delete(a, instanceIsRunningAnnotation)

if err := i.h.createOrUpdate(ctx, i.avn, o, refs); err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionCreateOrUpdate, err))
return fmt.Errorf("unable to create or update aiven instance: %w", err)
}

Expand All @@ -426,25 +435,18 @@ func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o
return nil
}

func (i instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) (bool, error) {
func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) error {
i.log.Info("checking if instance is ready")

serviceSecret, err := i.h.get(ctx, i.avn, o)
if err != nil {
return false, err
} else if serviceSecret != nil {
if err = i.createOrUpdateSecret(ctx, o, serviceSecret); err != nil {
return false, fmt.Errorf("unable to create or update aiven secret: %w", err)
}
secret, err := i.h.get(ctx, i.avn, o)
if secret == nil || err != nil {
return err
}
return IsAlreadyRunning(o), err

}

func (i instanceReconcilerHelper) createOrUpdateSecret(ctx context.Context, owner client.Object, want *corev1.Secret) error {
_, err := controllerutil.CreateOrUpdate(ctx, i.k8s, want, func() error {
return ctrl.SetControllerReference(owner, want, i.k8s.Scheme())
_, err = controllerutil.CreateOrUpdate(ctx, i.k8s, secret, func() error {
return controllerutil.SetControllerReference(o, secret, i.k8s.Scheme())
})

return err
}

Expand Down
4 changes: 0 additions & 4 deletions controllers/clickhouseuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return err
}

if err != nil && !aiven.IsAlreadyExists(err) {
return fmt.Errorf("cannot createOrUpdate clickhouse user on aiven side: %w", err)
}

user.Status.UUID = r.User.UUID

meta.SetStatusCondition(&user.Status.Conditions,
Expand Down
Loading

0 comments on commit b5207b5

Please sign in to comment.