Skip to content

Commit

Permalink
refactor(controllers): set conditions on top level
Browse files Browse the repository at this point in the history
  • Loading branch information
dependabot[bot] authored and byashimov committed Nov 29, 2023
1 parent 8320dbf commit 045f99e
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 123 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Set conditions on errors: `Preconditions`, `CreateOrUpdate`, `Delete`. Thanks to @atarax
- Add `Kafka` field `userConfig.kafka.transaction_partition_verification_enable`, type `boolean`: Enable
verification that checks that the partition has been added to the transaction before writing transactional
records to the partition
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ boilerplate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto,
$(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..."

.PHONY: generate
generate: boilerplate userconfigs imports manifests docs charts
generate: userconfigs boilerplate imports manifests docs charts

.PHONY: fmt
fmt: ## Run go fmt against code.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 46 additions & 51 deletions controllers/basic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

"github.com/aiven/aiven-go-client/v2"
"github.com/go-logr/logr"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -126,14 +126,15 @@ func (c *Controller) reconcileInstance(ctx context.Context, req ctrl.Request, h
return ctrl.Result{}, fmt.Errorf("cannot initialize aiven client: %w", err)
}

return instanceReconcilerHelper{
helper := instanceReconcilerHelper{
avn: avn,
k8s: c.Client,
h: h,
log: instanceLogger,
s: clientAuthSecret,
rec: c.Recorder,
}.reconcileInstance(ctx, o)
}
return helper.reconcile(ctx, o)
}

// a helper that closes over all instance specific fields
Expand All @@ -157,37 +158,35 @@ type instanceReconcilerHelper struct {
rec record.EventRecorder
}

func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o client.Object) (ctrl.Result, error) {
i.log.Info("reconciling instance")
i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation")

var err error

defer func() {
// Order matters.
// First need to update the object, and then update the status.
// So dependent resources won't see READY before it has been updated with new values
// Clone is used so update won't overwrite in-memory values
// We need to this on outer level because we otherwise wouldn't set status-conditions
// properly when preconditions fail
clone := o.DeepCopyObject().(client.Object)
err = multierror.Append(err, i.k8s.Update(ctx, clone))

// Original object has been updated
o.SetResourceVersion(clone.GetResourceVersion())

// It's ready to cast its status
err = multierror.Append(err, i.k8s.Status().Update(ctx, o))
err = err.(*multierror.Error).ErrorOrNil()
}()

func (i *instanceReconcilerHelper) reconcile(ctx context.Context, o v1alpha1.AivenManagedObject) (ctrl.Result, error) {
// Deletion
if isMarkedForDeletion(o) {
if controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) {
return i.finalize(ctx, o)
}
return ctrl.Result{}, nil
}

// Create or update
var err error
var result ctrl.Result
_, err = controllerutil.CreateOrUpdate(ctx, i.k8s, o, func() error {
result, err = i.reconcileInstance(ctx, o)
return err
})

if err != nil {
return result, err
}

err = i.k8s.Status().Update(ctx, o)
return result, err
}

func (i *instanceReconcilerHelper) reconcileInstance(ctx context.Context, o v1alpha1.AivenManagedObject) (ctrl.Result, error) {
i.log.Info("reconciling instance")
i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation")

// Add finalizers to an instance and associated secret, only if they haven't
// been added in the previous reconciliation loops
if i.s != nil {
Expand All @@ -198,6 +197,7 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien
}
}
}

if !controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) {
i.log.Info("adding finalizer to instance")
if err := addFinalizer(ctx, i.k8s, o, instanceDeletionFinalizer); err != nil {
Expand All @@ -217,11 +217,11 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien

requeue, err := i.checkPreconditions(ctx, o, refs)
if requeue {
// It must be possible to return requeue and error by design.
// By the time this comment created, there is no such case in checkPreconditions()
return ctrl.Result{Requeue: true, RequeueAfter: requeueTimeout}, err
}

if err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionPreconditions, err))
return ctrl.Result{}, err
}

Expand All @@ -236,7 +236,7 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien
}

i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForTheInstanceToBeRunning, "waiting for the instance to be running")
isRunning, err := i.updateInstanceStateAndSecretUntilRunning(ctx, o)
err = i.updateInstanceStateAndSecretUntilRunning(ctx, o)
if err != nil {
if aiven.IsNotFound(err) {
return ctrl.Result{
Expand All @@ -249,7 +249,7 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien
return ctrl.Result{}, fmt.Errorf("unable to wait until instance is running: %w", err)
}

if !isRunning {
if !IsAlreadyRunning(o) {
i.log.Info("instance is not yet running, triggering requeue")
return ctrl.Result{
Requeue: true,
Expand All @@ -259,11 +259,10 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien

i.rec.Event(o, corev1.EventTypeNormal, eventInstanceIsRunning, "instance is in a RUNNING state")
i.log.Info("instance was successfully reconciled")

return ctrl.Result{}, nil
}

func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) {
func (i *instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) {
i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForPreconditions, "waiting for preconditions of the instance")

// Checks references
Expand Down Expand Up @@ -293,7 +292,7 @@ func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o clie
return false, nil
}

func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Object) ([]client.Object, error) {
func (i *instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Object) ([]client.Object, error) {
refsObj, ok := o.(refsObject)
if !ok {
return nil, nil
Expand Down Expand Up @@ -330,7 +329,7 @@ func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Ob
// finalize runs finalization logic. If the finalization logic fails, don't remove the finalizer so
// that we can retry during the next reconciliation. When applicable, it retrieves an associated object that
// has to be deleted from Kubernetes, and it could be a secret associated with an instance.
func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) (ctrl.Result, error) {
func (i *instanceReconcilerHelper) finalize(ctx context.Context, o v1alpha1.AivenManagedObject) (ctrl.Result, error) {
i.rec.Event(o, corev1.EventTypeNormal, eventTryingToDeleteAtAiven, "trying to delete instance at aiven")

var err error
Expand All @@ -349,6 +348,9 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object)

if deletionPolicy == deletionPolicyDelete {
finalised, err = i.h.delete(ctx, i.avn, o)
if err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionDelete, err))
}
}

// There are dependencies on Aiven side, resets error, so it goes for requeue
Expand Down Expand Up @@ -401,20 +403,21 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object)
}

// isInvalidTokenError checks if the error is related to invalid token
func (i instanceReconcilerHelper) isInvalidTokenError(err error) bool {
func (i *instanceReconcilerHelper) isInvalidTokenError(err error) bool {
// When an instance was created but pointing to an invalid API token
// and no generation was ever processed, allow deleting such instance
msg := err.Error()
return strings.Contains(msg, "Invalid token") || strings.Contains(msg, "Missing (expired) db token")
}

func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o client.Object, refs []client.Object) error {
func (i *instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o v1alpha1.AivenManagedObject, refs []client.Object) error {
i.log.Info("generation wasn't processed, creation or updating instance on aiven side")
a := o.GetAnnotations()
delete(a, processedGenerationAnnotation)
delete(a, instanceIsRunningAnnotation)

if err := i.h.createOrUpdate(ctx, i.avn, o, refs); err != nil {
meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionCreateOrUpdate, err))
return fmt.Errorf("unable to create or update aiven instance: %w", err)
}

Expand All @@ -426,24 +429,16 @@ func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o
return nil
}

func (i instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) (bool, error) {
func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) error {
i.log.Info("checking if instance is ready")

serviceSecret, err := i.h.get(ctx, i.avn, o)
if err != nil {
return false, err
} else if serviceSecret != nil {
if err = i.createOrUpdateSecret(ctx, o, serviceSecret); err != nil {
return false, fmt.Errorf("unable to create or update aiven secret: %w", err)
}
secret, err := i.h.get(ctx, i.avn, o)
if secret == nil || err != nil {
return err
}
return IsAlreadyRunning(o), err

}

func (i instanceReconcilerHelper) createOrUpdateSecret(ctx context.Context, owner client.Object, want *corev1.Secret) error {
_, err := controllerutil.CreateOrUpdate(ctx, i.k8s, want, func() error {
return ctrl.SetControllerReference(owner, want, i.k8s.Scheme())
_, err = controllerutil.CreateOrUpdate(ctx, i.k8s, secret, func() error {
return controllerutil.SetControllerReference(o, secret, i.k8s.Scheme())
})
return err
}
Expand Down
4 changes: 0 additions & 4 deletions controllers/clickhouseuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.C
return err
}

if err != nil && !aiven.IsAlreadyExists(err) {
return fmt.Errorf("cannot createOrUpdate clickhouse user on aiven side: %w", err)
}

user.Status.UUID = r.User.UUID

meta.SetStatusCondition(&user.Status.Conditions,
Expand Down
12 changes: 10 additions & 2 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ const (
deletionPolicyDelete = "Delete"
)

type errCondition string

const (
errConditionDelete errCondition = "Delete"
errConditionPreconditions errCondition = "Preconditions"
errConditionCreateOrUpdate errCondition = "CreateOrUpdate"
)

var (
version = "dev"
errTerminationProtectionOn = errors.New("termination protection is on")
Expand Down Expand Up @@ -71,11 +79,11 @@ func getRunningCondition(status metav1.ConditionStatus, reason, message string)
}
}

func getErrorCondition(reason string, err error) metav1.Condition {
func getErrorCondition(reason errCondition, err error) metav1.Condition {
return metav1.Condition{
Type: conditionTypeError,
Status: metav1.ConditionUnknown,
Reason: reason,
Reason: string(reason),
Message: err.Error(),
}
}
Expand Down
5 changes: 0 additions & 5 deletions controllers/connectionpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl

exists, err := h.exists(ctx, avn, cp)
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("CheckExists", err))
return err
}
var reason string
Expand All @@ -62,7 +61,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl
Username: optionalStringPointer(cp.Spec.Username),
})
if err != nil && !aiven.IsAlreadyExists(err) {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Create", err))
return err
}
reason = "Created"
Expand All @@ -75,7 +73,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl
Username: optionalStringPointer(cp.Spec.Username),
})
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Update", err))
return err
}
reason = "Updated"
Expand Down Expand Up @@ -103,7 +100,6 @@ func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, ob

err = avn.ConnectionPools.Delete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name)
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Delete", err))
return false, err
}

Expand Down Expand Up @@ -204,7 +200,6 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive

check, err := checkServiceIsRunning(ctx, avn, cp.Spec.Project, cp.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Preconditions", err))
return false, err
}

Expand Down
9 changes: 1 addition & 8 deletions controllers/database_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
exists, err := h.exists(ctx, avn, db)

if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CheckExists", err))
return err
}

Expand All @@ -58,7 +57,6 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
LcType: db.Spec.LcCtype,
})
if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CreateOrUpdate", err))
return fmt.Errorf("cannot create database on Aiven side: %w", err)
}
}
Expand Down Expand Up @@ -93,7 +91,6 @@ func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, obj clie
db.Spec.ServiceName,
db.Name)
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Delete", err))
return false, err
}

Expand Down Expand Up @@ -138,11 +135,7 @@ func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&db.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

running, err := checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Preconditions", err))
}
return running, err
return checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName)
}

func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) {
Expand Down
Loading

0 comments on commit 045f99e

Please sign in to comment.