From 4ddde8f80d0059d810ceb274113a2690062a68f6 Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Tue, 5 Dec 2023 11:47:19 +0100 Subject: [PATCH] fix(controllers): object not committed properly (#550) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- CHANGELOG.md | 2 + Makefile | 2 +- .../service/kafka/zz_generated.deepcopy.go | 5 + controllers/basic_controller.go | 164 +++++++++--------- controllers/clickhouseuser_controller.go | 4 - controllers/common.go | 12 +- controllers/connectionpool_controller.go | 5 - controllers/database_controller.go | 9 +- controllers/generic_service_handler.go | 13 +- controllers/kafkaacl_controller.go | 7 +- controllers/kafkaconnector_controller.go | 15 +- controllers/kafkatopic_controller.go | 10 +- controllers/project_controller.go | 1 - go.mod | 4 +- go.sum | 4 - tests/generic_service_handler_test.go | 59 +++++++ 16 files changed, 169 insertions(+), 147 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01e752fa..8e1db425 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD +- Set conditions on errors: `Preconditions`, `CreateOrUpdate`, `Delete`. Thanks to @atarax +- Fix object updates lost when reconciler exits before the object is committed - Add `Kafka` field `userConfig.kafka.transaction_partition_verification_enable`, type `boolean`: Enable verification that checks that the partition has been added to the transaction before writing transactional records to the partition diff --git a/Makefile b/Makefile index 40cca3da..899d25e9 100644 --- a/Makefile +++ b/Makefile @@ -104,7 +104,7 @@ boilerplate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, $(CONTROLLER_GEN) object:headerFile="hack/boilerplate.go.txt" paths="./..." .PHONY: generate -generate: boilerplate userconfigs imports manifests docs charts +generate: userconfigs boilerplate imports manifests docs charts .PHONY: fmt fmt: ## Run go fmt against code. diff --git a/api/v1alpha1/userconfig/service/kafka/zz_generated.deepcopy.go b/api/v1alpha1/userconfig/service/kafka/zz_generated.deepcopy.go index 474ce48c..f581d73d 100644 --- a/api/v1alpha1/userconfig/service/kafka/zz_generated.deepcopy.go +++ b/api/v1alpha1/userconfig/service/kafka/zz_generated.deepcopy.go @@ -245,6 +245,11 @@ func (in *Kafka) DeepCopyInto(out *Kafka) { *out = new(int) **out = **in } + if in.TransactionPartitionVerificationEnable != nil { + in, out := &in.TransactionPartitionVerificationEnable, &out.TransactionPartitionVerificationEnable + *out = new(bool) + **out = **in + } if in.TransactionRemoveExpiredTransactionCleanupIntervalMs != nil { in, out := &in.TransactionRemoveExpiredTransactionCleanupIntervalMs, &out.TransactionRemoveExpiredTransactionCleanupIntervalMs *out = new(int) diff --git a/controllers/basic_controller.go b/controllers/basic_controller.go index 4389695b..534ace59 100644 --- a/controllers/basic_controller.go +++ b/controllers/basic_controller.go @@ -2,15 +2,16 @@ package controllers import ( "context" + "errors" "fmt" "strings" "time" "github.com/aiven/aiven-go-client/v2" "github.com/go-logr/logr" - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -80,7 +81,6 @@ const ( eventUnableToDelete = "UnableToDelete" eventSuccessfullyDeletedAtAiven = "SuccessfullyDeletedAtAiven" eventAddedFinalizer = "InstanceFinalizerAdded" - eventUnableToAddFinalizer = "UnableToAddFinalizer" eventWaitingForPreconditions = "WaitingForPreconditions" eventUnableToWaitForPreconditions = "UnableToWaitForPreconditions" eventPreconditionsAreMet = "PreconditionsAreMet" @@ -126,14 +126,17 @@ func (c *Controller) reconcileInstance(ctx context.Context, req ctrl.Request, h return ctrl.Result{}, fmt.Errorf("cannot initialize aiven client: %w", err) } - return instanceReconcilerHelper{ + helper := instanceReconcilerHelper{ avn: avn, k8s: c.Client, h: h, log: instanceLogger, s: clientAuthSecret, rec: c.Recorder, - }.reconcileInstance(ctx, o) + } + + requeue, err := helper.reconcile(ctx, o) + return ctrl.Result{Requeue: requeue, RequeueAfter: requeueTimeout}, err } // a helper that closes over all instance specific fields @@ -157,53 +160,65 @@ type instanceReconcilerHelper struct { rec record.EventRecorder } -func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o client.Object) (ctrl.Result, error) { - i.log.Info("reconciling instance") - i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation") - - var err error - - defer func() { - // Order matters. - // First need to update the object, and then update the status. - // So dependent resources won't see READY before it has been updated with new values - // Clone is used so update won't overwrite in-memory values - // We need to this on outer level because we otherwise wouldn't set status-conditions - // properly when preconditions fail - clone := o.DeepCopyObject().(client.Object) - err = multierror.Append(err, i.k8s.Update(ctx, clone)) - - // Original object has been updated - o.SetResourceVersion(clone.GetResourceVersion()) - - // It's ready to cast its status - err = multierror.Append(err, i.k8s.Status().Update(ctx, o)) - err = err.(*multierror.Error).ErrorOrNil() - }() - +func (i *instanceReconcilerHelper) reconcile(ctx context.Context, o v1alpha1.AivenManagedObject) (bool, error) { + // Deletion if isMarkedForDeletion(o) { if controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) { return i.finalize(ctx, o) } - return ctrl.Result{}, nil + return false, nil } + // Create or update. + // Even if reconcile fails, we need to update the object in kube + // to save conditions and other data. + // So we don't exit on error. + orig := o.DeepCopyObject().(v1alpha1.AivenManagedObject) + requeue, err := i.reconcileInstance(ctx, o) + if equality.Semantic.DeepEqual(orig, o) { + return requeue, err + } + + // Order matters. + // First need to update the object, and then update the status. + // So dependent resources won't see READY before it has been updated with new values + + // When updated, object status is vanished. + // So we waste a copy for that, + // while the original object must already have all the fields updated in runtime + clone := o.DeepCopyObject().(client.Object) + errUpdate := i.k8s.Update(ctx, clone) + + // Now we can update the status + o.SetResourceVersion(clone.GetResourceVersion()) + errStatus := i.k8s.Status().Update(ctx, o) + errMerged := errors.Join(err, errUpdate, errStatus) + if errMerged != nil { + return true, errMerged + } + + return requeue, nil +} + +func (i *instanceReconcilerHelper) reconcileInstance(ctx context.Context, o v1alpha1.AivenManagedObject) (bool, error) { + i.log.Info("reconciling instance") + i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation") + // Add finalizers to an instance and associated secret, only if they haven't // been added in the previous reconciliation loops if i.s != nil { if !controllerutil.ContainsFinalizer(i.s, secretProtectionFinalizer) { i.log.Info("adding finalizer to secret") if err := addFinalizer(ctx, i.k8s, i.s, secretProtectionFinalizer); err != nil { - return ctrl.Result{}, fmt.Errorf("unable to add finalizer to secret: %w", err) + return false, fmt.Errorf("unable to add finalizer to secret: %w", err) } } } + if !controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) { + // Adds finalizer. The commit is performed in the outer function i.log.Info("adding finalizer to instance") - if err := addFinalizer(ctx, i.k8s, o, instanceDeletionFinalizer); err != nil { - i.rec.Eventf(o, corev1.EventTypeWarning, eventUnableToAddFinalizer, err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to add finalizer to instance: %w", err) - } + controllerutil.AddFinalizer(o, instanceDeletionFinalizer) i.rec.Event(o, corev1.EventTypeNormal, eventAddedFinalizer, "instance finalizer added") } @@ -212,58 +227,51 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien refs, err := i.getObjectRefs(ctx, o) if err != nil { i.log.Info(fmt.Sprintf("one or more references can't be found yet: %s", err)) - return ctrl.Result{Requeue: true, RequeueAfter: requeueTimeout}, nil + return true, nil } requeue, err := i.checkPreconditions(ctx, o, refs) if requeue { - // It must be possible to return requeue and error by design. - // By the time this comment created, there is no such case in checkPreconditions() - return ctrl.Result{Requeue: true, RequeueAfter: requeueTimeout}, err + return true, err } + if err != nil { - return ctrl.Result{}, err + meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionPreconditions, err)) + return false, err } if !isAlreadyProcessed(o) { i.rec.Event(o, corev1.EventTypeNormal, eventCreateOrUpdatedAtAiven, "about to create instance at aiven") if err := i.createOrUpdateInstance(ctx, o, refs); err != nil { i.rec.Event(o, corev1.EventTypeWarning, eventUnableToCreateOrUpdateAtAiven, err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to create or update instance at aiven: %w", err) + return false, fmt.Errorf("unable to create or update instance at aiven: %w", err) } i.rec.Event(o, corev1.EventTypeNormal, eventCreatedOrUpdatedAtAiven, "instance was created at aiven but may not be running yet") } i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForTheInstanceToBeRunning, "waiting for the instance to be running") - isRunning, err := i.updateInstanceStateAndSecretUntilRunning(ctx, o) + err = i.updateInstanceStateAndSecretUntilRunning(ctx, o) if err != nil { if aiven.IsNotFound(err) { - return ctrl.Result{ - Requeue: true, - RequeueAfter: requeueTimeout, - }, nil + return true, nil } i.rec.Event(o, corev1.EventTypeWarning, eventUnableToWaitForInstanceToBeRunning, err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to wait until instance is running: %w", err) + return false, fmt.Errorf("unable to wait until instance is running: %w", err) } - if !isRunning { + if !IsAlreadyRunning(o) { i.log.Info("instance is not yet running, triggering requeue") - return ctrl.Result{ - Requeue: true, - RequeueAfter: requeueTimeout, - }, nil + return true, nil } i.rec.Event(o, corev1.EventTypeNormal, eventInstanceIsRunning, "instance is in a RUNNING state") i.log.Info("instance was successfully reconciled") - - return ctrl.Result{}, nil + return false, nil } -func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) { +func (i *instanceReconcilerHelper) checkPreconditions(ctx context.Context, o client.Object, refs []client.Object) (bool, error) { i.rec.Event(o, corev1.EventTypeNormal, eventWaitingForPreconditions, "waiting for preconditions of the instance") // Checks references @@ -293,7 +301,7 @@ func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o clie return false, nil } -func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Object) ([]client.Object, error) { +func (i *instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Object) ([]client.Object, error) { refsObj, ok := o.(refsObject) if !ok { return nil, nil @@ -330,7 +338,7 @@ func (i instanceReconcilerHelper) getObjectRefs(ctx context.Context, o client.Ob // finalize runs finalization logic. If the finalization logic fails, don't remove the finalizer so // that we can retry during the next reconciliation. When applicable, it retrieves an associated object that // has to be deleted from Kubernetes, and it could be a secret associated with an instance. -func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) (ctrl.Result, error) { +func (i *instanceReconcilerHelper) finalize(ctx context.Context, o v1alpha1.AivenManagedObject) (bool, error) { i.rec.Event(o, corev1.EventTypeNormal, eventTryingToDeleteAtAiven, "trying to delete instance at aiven") var err error @@ -349,6 +357,9 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) if deletionPolicy == deletionPolicyDelete { finalised, err = i.h.delete(ctx, i.avn, o) + if err != nil { + meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionDelete, err)) + } } // There are dependencies on Aiven side, resets error, so it goes for requeue @@ -367,24 +378,21 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) finalised = true } else if aiven.IsNotFound(err) { i.rec.Event(o, corev1.EventTypeWarning, eventUnableToDeleteAtAiven, err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to delete instance at aiven: %w", err) + return false, fmt.Errorf("unable to delete instance at aiven: %w", err) } else if isAivenServerError(err) { // If failed to delete, retries i.log.Info(fmt.Sprintf("unable to delete instance at aiven: %s", err)) err = nil } else { i.rec.Event(o, corev1.EventTypeWarning, eventUnableToDelete, err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to delete instance: %w", err) + return false, fmt.Errorf("unable to delete instance: %w", err) } } // checking if instance was finalized, if not triggering a requeue if !finalised { i.log.Info("instance is not yet deleted at aiven, triggering requeue") - return ctrl.Result{ - Requeue: true, - RequeueAfter: requeueTimeout, - }, nil + return true, nil } i.log.Info("instance was successfully deleted at aiven, removing finalizer") @@ -393,28 +401,29 @@ func (i instanceReconcilerHelper) finalize(ctx context.Context, o client.Object) // remove finalizer, once all finalizers have been removed, the object will be deleted. if err := removeFinalizer(ctx, i.k8s, o, instanceDeletionFinalizer); err != nil { i.rec.Event(o, corev1.EventTypeWarning, eventUnableToDeleteFinalizer, err.Error()) - return ctrl.Result{}, fmt.Errorf("unable to remove finalizer: %w", err) + return false, fmt.Errorf("unable to remove finalizer: %w", err) } i.log.Info("finalizer was removed, instance is deleted") - return ctrl.Result{}, nil + return false, nil } // isInvalidTokenError checks if the error is related to invalid token -func (i instanceReconcilerHelper) isInvalidTokenError(err error) bool { +func (i *instanceReconcilerHelper) isInvalidTokenError(err error) bool { // When an instance was created but pointing to an invalid API token // and no generation was ever processed, allow deleting such instance msg := err.Error() return strings.Contains(msg, "Invalid token") || strings.Contains(msg, "Missing (expired) db token") } -func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o client.Object, refs []client.Object) error { +func (i *instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o v1alpha1.AivenManagedObject, refs []client.Object) error { i.log.Info("generation wasn't processed, creation or updating instance on aiven side") a := o.GetAnnotations() delete(a, processedGenerationAnnotation) delete(a, instanceIsRunningAnnotation) if err := i.h.createOrUpdate(ctx, i.avn, o, refs); err != nil { + meta.SetStatusCondition(o.Conditions(), getErrorCondition(errConditionCreateOrUpdate, err)) return fmt.Errorf("unable to create or update aiven instance: %w", err) } @@ -426,25 +435,18 @@ func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o return nil } -func (i instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) (bool, error) { +func (i *instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) error { i.log.Info("checking if instance is ready") - serviceSecret, err := i.h.get(ctx, i.avn, o) - if err != nil { - return false, err - } else if serviceSecret != nil { - if err = i.createOrUpdateSecret(ctx, o, serviceSecret); err != nil { - return false, fmt.Errorf("unable to create or update aiven secret: %w", err) - } + secret, err := i.h.get(ctx, i.avn, o) + if secret == nil || err != nil { + return err } - return IsAlreadyRunning(o), err - -} -func (i instanceReconcilerHelper) createOrUpdateSecret(ctx context.Context, owner client.Object, want *corev1.Secret) error { - _, err := controllerutil.CreateOrUpdate(ctx, i.k8s, want, func() error { - return ctrl.SetControllerReference(owner, want, i.k8s.Scheme()) + _, err = controllerutil.CreateOrUpdate(ctx, i.k8s, secret, func() error { + return controllerutil.SetControllerReference(o, secret, i.k8s.Scheme()) }) + return err } diff --git a/controllers/clickhouseuser_controller.go b/controllers/clickhouseuser_controller.go index 77631eff..7217fb18 100644 --- a/controllers/clickhouseuser_controller.go +++ b/controllers/clickhouseuser_controller.go @@ -53,10 +53,6 @@ func (h *clickhouseUserHandler) createOrUpdate(ctx context.Context, avn *aiven.C return err } - if err != nil && !aiven.IsAlreadyExists(err) { - return fmt.Errorf("cannot createOrUpdate clickhouse user on aiven side: %w", err) - } - user.Status.UUID = r.User.UUID meta.SetStatusCondition(&user.Status.Conditions, diff --git a/controllers/common.go b/controllers/common.go index 379db068..2c724b5f 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -35,6 +35,14 @@ const ( deletionPolicyDelete = "Delete" ) +type errCondition string + +const ( + errConditionDelete errCondition = "Delete" + errConditionPreconditions errCondition = "Preconditions" + errConditionCreateOrUpdate errCondition = "CreateOrUpdate" +) + var ( version = "dev" errTerminationProtectionOn = errors.New("termination protection is on") @@ -71,11 +79,11 @@ func getRunningCondition(status metav1.ConditionStatus, reason, message string) } } -func getErrorCondition(reason string, err error) metav1.Condition { +func getErrorCondition(reason errCondition, err error) metav1.Condition { return metav1.Condition{ Type: conditionTypeError, Status: metav1.ConditionUnknown, - Reason: reason, + Reason: string(reason), Message: err.Error(), } } diff --git a/controllers/connectionpool_controller.go b/controllers/connectionpool_controller.go index df647257..d3843a8a 100644 --- a/controllers/connectionpool_controller.go +++ b/controllers/connectionpool_controller.go @@ -48,7 +48,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl exists, err := h.exists(ctx, avn, cp) if err != nil { - meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("CheckExists", err)) return err } var reason string @@ -62,7 +61,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl Username: optionalStringPointer(cp.Spec.Username), }) if err != nil && !aiven.IsAlreadyExists(err) { - meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Create", err)) return err } reason = "Created" @@ -75,7 +73,6 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl Username: optionalStringPointer(cp.Spec.Username), }) if err != nil { - meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Update", err)) return err } reason = "Updated" @@ -103,7 +100,6 @@ func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, ob err = avn.ConnectionPools.Delete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name) if err != nil && !aiven.IsNotFound(err) { - meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Delete", err)) return false, err } @@ -204,7 +200,6 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive check, err := checkServiceIsRunning(ctx, avn, cp.Spec.Project, cp.Spec.ServiceName) if err != nil { - meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Preconditions", err)) return false, err } diff --git a/controllers/database_controller.go b/controllers/database_controller.go index ebb2eb66..b782108e 100644 --- a/controllers/database_controller.go +++ b/controllers/database_controller.go @@ -47,7 +47,6 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, exists, err := h.exists(ctx, avn, db) if err != nil { - meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CheckExists", err)) return err } @@ -58,7 +57,6 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, LcType: db.Spec.LcCtype, }) if err != nil { - meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CreateOrUpdate", err)) return fmt.Errorf("cannot create database on Aiven side: %w", err) } } @@ -93,7 +91,6 @@ func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, obj clie db.Spec.ServiceName, db.Name) if err != nil && !aiven.IsNotFound(err) { - meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Delete", err)) return false, err } @@ -138,11 +135,7 @@ func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie meta.SetStatusCondition(&db.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - running, err := checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName) - if err != nil { - meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Preconditions", err)) - } - return running, err + return checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName) } func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) { diff --git a/controllers/generic_service_handler.go b/controllers/generic_service_handler.go index d79d02bd..56d6318b 100644 --- a/controllers/generic_service_handler.go +++ b/controllers/generic_service_handler.go @@ -45,7 +45,6 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C _, err = avn.Services.Get(ctx, spec.Project, ometa.Name) exists := err == nil if !exists && !aiven.IsNotFound(err) { - meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("CheckExists", err)) return fmt.Errorf("failed to fetch service: %w", err) } @@ -81,14 +80,12 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C _, err = avn.Services.Create(ctx, spec.Project, req) if err != nil { - meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Create", err)) return fmt.Errorf("failed to create service: %w", err) } } else { reason = "Updated" userConfig, err := UpdateUserConfiguration(o.getUserConfig()) if err != nil { - meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err)) return err } @@ -104,7 +101,6 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C } _, err = avn.Services.Update(ctx, spec.Project, ometa.Name, req) if err != nil { - meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err)) return fmt.Errorf("failed to update service: %w", err) } } @@ -120,7 +116,6 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C } _, err = avn.ServiceTags.Set(ctx, spec.Project, ometa.Name, req) if err != nil { - meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err)) return fmt.Errorf("failed to update tags: %w", err) } @@ -154,7 +149,6 @@ func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, o return true, nil } - meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Delete", err)) return false, fmt.Errorf("failed to delete service in Aiven: %w", err) } @@ -197,11 +191,8 @@ func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiv // If not, the wrapper controller will try later if s.IntegrationType == "read_replica" { r, err := checkServiceIsRunning(ctx, avn, spec.Project, s.SourceServiceName) - if err != nil { - meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Preconditions", err)) - } - if !(r && err == nil) { - return false, nil + if !r || err != nil { + return false, err } } } diff --git a/controllers/kafkaacl_controller.go b/controllers/kafkaacl_controller.go index 8db6e575..9e6af9ad 100644 --- a/controllers/kafkaacl_controller.go +++ b/controllers/kafkaacl_controller.go @@ -48,7 +48,6 @@ func (h KafkaACLHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, // Tries to delete it instead _, err = h.delete(ctx, avn, obj) if err != nil { - meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err)) return err } @@ -161,11 +160,7 @@ func (h KafkaACLHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie meta.SetStatusCondition(&acl.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - running, err := checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName) - if err != nil { - meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err)) - } - return running, err + return checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName) } func (h KafkaACLHandler) convert(i client.Object) (*v1alpha1.KafkaACL, error) { diff --git a/controllers/kafkaconnector_controller.go b/controllers/kafkaconnector_controller.go index f9903cee..bd86011f 100644 --- a/controllers/kafkaconnector_controller.go +++ b/controllers/kafkaconnector_controller.go @@ -52,13 +52,11 @@ func (h KafkaConnectorHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl exists, err := h.exists(ctx, avn, conn) if err != nil { - meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("CheckExists", err)) return fmt.Errorf("unable to check if kafka connector exists: %w", err) } connCfg, err := h.buildConnectorConfig(conn) if err != nil { - meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("BuildConfig", err)) return fmt.Errorf("unable to build connector config: %w", err) } @@ -66,14 +64,12 @@ func (h KafkaConnectorHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl if !exists { err = avn.KafkaConnectors.Create(ctx, conn.Spec.Project, conn.Spec.ServiceName, connCfg) if err != nil && !aiven.IsAlreadyExists(err) { - meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Create", err)) return err } reason = "Created" } else { - _, err := avn.KafkaConnectors.Update(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name, connCfg) + _, err = avn.KafkaConnectors.Update(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name, connCfg) if err != nil { - meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Update", err)) return err } reason = "Updated" @@ -135,7 +131,7 @@ func (h KafkaConnectorHandler) buildConnectorConfig(conn *v1alpha1.KafkaConnecto } m[k] = templateRes.String() } - return aiven.KafkaConnectorConfig(m), nil + return m, nil } func (h KafkaConnectorHandler) delete(ctx context.Context, avn *aiven.Client, obj client.Object) (bool, error) { @@ -145,7 +141,6 @@ func (h KafkaConnectorHandler) delete(ctx context.Context, avn *aiven.Client, ob } err = avn.KafkaConnectors.Delete(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name) if err != nil && !aiven.IsNotFound(err) { - meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Delete", err)) return false, fmt.Errorf("unable to delete kafka connector: %w", err) } return true, nil @@ -220,11 +215,7 @@ func (h KafkaConnectorHandler) checkPreconditions(ctx context.Context, avn *aive meta.SetStatusCondition(&conn.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - running, err := checkServiceIsRunning(ctx, avn, conn.Spec.Project, conn.Spec.ServiceName) - if err != nil { - meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Preconditions", err)) - } - return running, err + return checkServiceIsRunning(ctx, avn, conn.Spec.Project, conn.Spec.ServiceName) } func (h KafkaConnectorHandler) convert(o client.Object) (*v1alpha1.KafkaConnector, error) { diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index 7c278a59..12d4e320 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -53,7 +53,6 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client exists, err := h.exists(ctx, avn, topic) if err != nil { - meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("CheckExists", err)) return err } @@ -67,7 +66,6 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client Config: convertKafkaTopicConfig(topic), }) if err != nil && !aiven.IsAlreadyExists(err) { - meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Create", err)) return err } @@ -81,7 +79,6 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client Config: convertKafkaTopicConfig(topic), }) if err != nil { - meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Update", err)) return fmt.Errorf("cannot update Kafka Topic: %w", err) } @@ -115,7 +112,6 @@ func (h KafkaTopicHandler) delete(ctx context.Context, avn *aiven.Client, obj cl // Delete project on Aiven side err = avn.KafkaTopics.Delete(ctx, topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) if err != nil && !aiven.IsNotFound(err) { - meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Delete", err)) return false, err } @@ -172,11 +168,7 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl meta.SetStatusCondition(&topic.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - running, err := checkServiceIsRunning(ctx, avn, topic.Spec.Project, topic.Spec.ServiceName) - if err != nil { - meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Preconditions", err)) - } - return running, err + return checkServiceIsRunning(ctx, avn, topic.Spec.Project, topic.Spec.ServiceName) } func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) { diff --git a/controllers/project_controller.go b/controllers/project_controller.go index fc1be4ff..8096c20f 100644 --- a/controllers/project_controller.go +++ b/controllers/project_controller.go @@ -77,7 +77,6 @@ func (h ProjectHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, o exists, err := h.exists(ctx, avn, project) if err != nil { - meta.SetStatusCondition(&project.Status.Conditions, getErrorCondition("CheckExists", err)) return fmt.Errorf("project does not exists: %w", err) } diff --git a/go.mod b/go.mod index f235e736..b321bc70 100644 --- a/go.mod +++ b/go.mod @@ -10,10 +10,8 @@ require ( github.com/ghodss/yaml v1.0.0 github.com/go-logr/logr v1.3.0 github.com/google/go-cmp v0.6.0 - github.com/hashicorp/go-multierror v1.1.1 github.com/liip/sheriff v0.11.1 github.com/otiai10/copy v1.14.0 - github.com/pkg/errors v0.9.1 github.com/stoewer/go-strcase v1.3.0 github.com/stretchr/testify v1.8.4 github.com/xeipuuv/gojsonschema v1.2.0 @@ -52,7 +50,6 @@ require ( github.com/google/gnostic v0.5.7-v3refs // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/uuid v1.3.0 // indirect - github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-retryablehttp v0.7.4 // indirect github.com/hashicorp/go-version v0.0.0-20161031182605-e96d38404026 // indirect @@ -66,6 +63,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo/v2 v2.3.1 // indirect github.com/onsi/gomega v1.22.1 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect diff --git a/go.sum b/go.sum index 760e0483..d1222788 100644 --- a/go.sum +++ b/go.sum @@ -234,14 +234,10 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= -github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= -github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= -github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/hashicorp/go-version v0.0.0-20161031182605-e96d38404026 h1:qWx/DcC6l4ZzuS+JBAzI5XjtLFDCc08zYeZ0kLnaH2g= diff --git a/tests/generic_service_handler_test.go b/tests/generic_service_handler_test.go index c6a1aabf..c096d821 100644 --- a/tests/generic_service_handler_test.go +++ b/tests/generic_service_handler_test.go @@ -4,9 +4,12 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aiven/aiven-operator/api/v1alpha1" ) @@ -90,3 +93,59 @@ func TestCreateUpdateService(t *testing.T) { require.NoError(t, err) assert.Empty(t, tagsUpdatedAvn.Tags) // cleared tags } + +func getErrorConditionYaml(project, pgName string) string { + return fmt.Sprintf(` +apiVersion: aiven.io/v1alpha1 +kind: PostgreSQL +metadata: + name: %[2]s +spec: + authSecretRef: + name: aiven-token + key: token + + project: %[1]s + cloudName: google-europe-west1 + plan: startup-1234 +`, project, pgName) +} + +func TestErrorCondition(t *testing.T) { + t.Parallel() + defer recoverPanic(t) + + // GIVEN + ctx := context.Background() + pgName := randName("generic-handler") + yml := getErrorConditionYaml(testProject, pgName) + s := NewSession(k8sClient, avnClient, testProject) + + // Cleans test afterwards + defer s.Destroy() + + // WHEN + // Applies given manifest + require.NoError(t, s.Apply(yml)) + + // THEN + pg := new(v1alpha1.PostgreSQL) + for *pg.Conditions() == nil { + err := k8sClient.Get(ctx, client.ObjectKey{Namespace: defaultNamespace, Name: pgName}, pg) + if apierrors.IsNotFound(err) { + // Ignore not found, because it takes time to commit a resource to the storage + err = nil + } + require.NoError(t, err) + time.Sleep(time.Second) + } + + found := false + for _, c := range *pg.Conditions() { + if c.Reason == "CreateOrUpdate" { + found = true + assert.Contains(t, c.Message, "Plan 'startup-1234' for service type ServiceType.pg is not available in cloud 'google-europe-west1'") + } + } + assert.True(t, found) +}