From 3cffe03a29a9dad8d0b711ec8e4e6f4d9d09a557 Mon Sep 17 00:00:00 2001 From: atarax Date: Thu, 16 Nov 2023 18:20:14 +0100 Subject: [PATCH] feat: implement error-conditions --- controllers/basic_controller.go | 40 +++++++++++++----------- controllers/common.go | 11 +++++++ controllers/connectionpool_controller.go | 5 +++ controllers/database_controller.go | 9 +++++- controllers/generic_service_handler.go | 9 ++++++ controllers/kafkaacl_controller.go | 7 ++++- controllers/kafkaconnector_controller.go | 11 ++++++- controllers/kafkatopic_controller.go | 10 +++++- controllers/project_controller.go | 1 + 9 files changed, 81 insertions(+), 22 deletions(-) diff --git a/controllers/basic_controller.go b/controllers/basic_controller.go index 4aac1b0f..15025102 100644 --- a/controllers/basic_controller.go +++ b/controllers/basic_controller.go @@ -90,6 +90,7 @@ const ( eventWaitingForPreconditions = "WaitingForPreconditions" eventUnableToWaitForPreconditions = "UnableToWaitForPreconditions" eventPreconditionsAreMet = "PreconditionsAreMet" + eventPreconditionsNotMet = "PreconditionsNotMet" eventUnableToCreateOrUpdateAtAiven = "UnableToCreateOrUpdateAtAiven" eventCreateOrUpdatedAtAiven = "CreateOrUpdatedAtAiven" eventCreatedOrUpdatedAtAiven = "CreatedOrUpdatedAtAiven" @@ -166,6 +167,26 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien i.log.Info("reconciling instance") i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation") + var err error + + defer func() { + // Order matters. + // First need to update the object, and then update the status. + // So dependent resources won't see READY before it has been updated with new values + // Clone is used so update won't overwrite in-memory values + // We need to this on outer level because we otherwise wouldn't set status-conditions + // properly when preconditions fail + clone := o.DeepCopyObject().(client.Object) + err = multierror.Append(err, i.k8s.Update(ctx, clone)) + + // Original object has been updated + o.SetResourceVersion(clone.GetResourceVersion()) + + // It's ready to cast its status + err = multierror.Append(err, i.k8s.Status().Update(ctx, o)) + err = err.(*multierror.Error).ErrorOrNil() + }() + if isMarkedForDeletion(o) { if controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) { return i.finalize(ctx, o) @@ -269,6 +290,7 @@ func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o clie } if !check { + i.rec.Event(o, corev1.EventTypeNormal, eventPreconditionsNotMet, "preconditions are not met, requeue") i.log.Info("preconditions are not met, requeue") return true, nil } @@ -411,26 +433,8 @@ func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o } func (i instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) (bool, error) { - var err error - i.log.Info("checking if instance is ready") - defer func() { - // Order matters. - // First need to update the object, and then update the status. - // So dependent resources won't see READY before it has been updated with new values - // Clone is used so update won't overwrite in-memory values - clone := o.DeepCopyObject().(client.Object) - err = multierror.Append(err, i.k8s.Update(ctx, clone)) - - // Original object has been updated - o.SetResourceVersion(clone.GetResourceVersion()) - - // It's ready to cast its status - err = multierror.Append(err, i.k8s.Status().Update(ctx, o)) - err = err.(*multierror.Error).ErrorOrNil() - }() - serviceSecret, err := i.h.get(ctx, i.avn, o) if err != nil { return false, err diff --git a/controllers/common.go b/controllers/common.go index da1067ad..379db068 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -22,6 +22,7 @@ import ( const ( conditionTypeRunning = "Running" conditionTypeInitialized = "Initialized" + conditionTypeError = "Error" secretProtectionFinalizer = "finalizers.aiven.io/needed-to-delete-services" instanceDeletionFinalizer = "finalizers.aiven.io/delete-remote-resource" @@ -44,6 +45,7 @@ func checkServiceIsRunning(ctx context.Context, c *aiven.Client, project, servic if err != nil { // if service is not found, it is not running if aiven.IsNotFound(err) { + // this will swallow an error if the project doesn't exist and object is not project return false, nil } return false, err @@ -69,6 +71,15 @@ func getRunningCondition(status metav1.ConditionStatus, reason, message string) } } +func getErrorCondition(reason string, err error) metav1.Condition { + return metav1.Condition{ + Type: conditionTypeError, + Status: metav1.ConditionUnknown, + Reason: reason, + Message: err.Error(), + } +} + func isMarkedForDeletion(o client.Object) bool { return !o.GetDeletionTimestamp().IsZero() } diff --git a/controllers/connectionpool_controller.go b/controllers/connectionpool_controller.go index d3843a8a..df647257 100644 --- a/controllers/connectionpool_controller.go +++ b/controllers/connectionpool_controller.go @@ -48,6 +48,7 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl exists, err := h.exists(ctx, avn, cp) if err != nil { + meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("CheckExists", err)) return err } var reason string @@ -61,6 +62,7 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl Username: optionalStringPointer(cp.Spec.Username), }) if err != nil && !aiven.IsAlreadyExists(err) { + meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Create", err)) return err } reason = "Created" @@ -73,6 +75,7 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl Username: optionalStringPointer(cp.Spec.Username), }) if err != nil { + meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Update", err)) return err } reason = "Updated" @@ -100,6 +103,7 @@ func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, ob err = avn.ConnectionPools.Delete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name) if err != nil && !aiven.IsNotFound(err) { + meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Delete", err)) return false, err } @@ -200,6 +204,7 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive check, err := checkServiceIsRunning(ctx, avn, cp.Spec.Project, cp.Spec.ServiceName) if err != nil { + meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Preconditions", err)) return false, err } diff --git a/controllers/database_controller.go b/controllers/database_controller.go index b782108e..ebb2eb66 100644 --- a/controllers/database_controller.go +++ b/controllers/database_controller.go @@ -47,6 +47,7 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, exists, err := h.exists(ctx, avn, db) if err != nil { + meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CheckExists", err)) return err } @@ -57,6 +58,7 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, LcType: db.Spec.LcCtype, }) if err != nil { + meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CreateOrUpdate", err)) return fmt.Errorf("cannot create database on Aiven side: %w", err) } } @@ -91,6 +93,7 @@ func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, obj clie db.Spec.ServiceName, db.Name) if err != nil && !aiven.IsNotFound(err) { + meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Delete", err)) return false, err } @@ -135,7 +138,11 @@ func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie meta.SetStatusCondition(&db.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName) + running, err := checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName) + if err != nil { + meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Preconditions", err)) + } + return running, err } func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) { diff --git a/controllers/generic_service_handler.go b/controllers/generic_service_handler.go index 181dcba6..d79d02bd 100644 --- a/controllers/generic_service_handler.go +++ b/controllers/generic_service_handler.go @@ -45,6 +45,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C _, err = avn.Services.Get(ctx, spec.Project, ometa.Name) exists := err == nil if !exists && !aiven.IsNotFound(err) { + meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("CheckExists", err)) return fmt.Errorf("failed to fetch service: %w", err) } @@ -80,12 +81,14 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C _, err = avn.Services.Create(ctx, spec.Project, req) if err != nil { + meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Create", err)) return fmt.Errorf("failed to create service: %w", err) } } else { reason = "Updated" userConfig, err := UpdateUserConfiguration(o.getUserConfig()) if err != nil { + meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err)) return err } @@ -101,6 +104,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C } _, err = avn.Services.Update(ctx, spec.Project, ometa.Name, req) if err != nil { + meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err)) return fmt.Errorf("failed to update service: %w", err) } } @@ -116,6 +120,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C } _, err = avn.ServiceTags.Set(ctx, spec.Project, ometa.Name, req) if err != nil { + meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err)) return fmt.Errorf("failed to update tags: %w", err) } @@ -149,6 +154,7 @@ func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, o return true, nil } + meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Delete", err)) return false, fmt.Errorf("failed to delete service in Aiven: %w", err) } @@ -191,6 +197,9 @@ func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiv // If not, the wrapper controller will try later if s.IntegrationType == "read_replica" { r, err := checkServiceIsRunning(ctx, avn, spec.Project, s.SourceServiceName) + if err != nil { + meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Preconditions", err)) + } if !(r && err == nil) { return false, nil } diff --git a/controllers/kafkaacl_controller.go b/controllers/kafkaacl_controller.go index 9e6af9ad..8db6e575 100644 --- a/controllers/kafkaacl_controller.go +++ b/controllers/kafkaacl_controller.go @@ -48,6 +48,7 @@ func (h KafkaACLHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, // Tries to delete it instead _, err = h.delete(ctx, avn, obj) if err != nil { + meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err)) return err } @@ -160,7 +161,11 @@ func (h KafkaACLHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie meta.SetStatusCondition(&acl.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName) + running, err := checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName) + if err != nil { + meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err)) + } + return running, err } func (h KafkaACLHandler) convert(i client.Object) (*v1alpha1.KafkaACL, error) { diff --git a/controllers/kafkaconnector_controller.go b/controllers/kafkaconnector_controller.go index afba0e5f..f9903cee 100644 --- a/controllers/kafkaconnector_controller.go +++ b/controllers/kafkaconnector_controller.go @@ -52,11 +52,13 @@ func (h KafkaConnectorHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl exists, err := h.exists(ctx, avn, conn) if err != nil { + meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("CheckExists", err)) return fmt.Errorf("unable to check if kafka connector exists: %w", err) } connCfg, err := h.buildConnectorConfig(conn) if err != nil { + meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("BuildConfig", err)) return fmt.Errorf("unable to build connector config: %w", err) } @@ -64,12 +66,14 @@ func (h KafkaConnectorHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl if !exists { err = avn.KafkaConnectors.Create(ctx, conn.Spec.Project, conn.Spec.ServiceName, connCfg) if err != nil && !aiven.IsAlreadyExists(err) { + meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Create", err)) return err } reason = "Created" } else { _, err := avn.KafkaConnectors.Update(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name, connCfg) if err != nil { + meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Update", err)) return err } reason = "Updated" @@ -141,6 +145,7 @@ func (h KafkaConnectorHandler) delete(ctx context.Context, avn *aiven.Client, ob } err = avn.KafkaConnectors.Delete(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name) if err != nil && !aiven.IsNotFound(err) { + meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Delete", err)) return false, fmt.Errorf("unable to delete kafka connector: %w", err) } return true, nil @@ -215,7 +220,11 @@ func (h KafkaConnectorHandler) checkPreconditions(ctx context.Context, avn *aive meta.SetStatusCondition(&conn.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(ctx, avn, conn.Spec.Project, conn.Spec.ServiceName) + running, err := checkServiceIsRunning(ctx, avn, conn.Spec.Project, conn.Spec.ServiceName) + if err != nil { + meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Preconditions", err)) + } + return running, err } func (h KafkaConnectorHandler) convert(o client.Object) (*v1alpha1.KafkaConnector, error) { diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index 12d4e320..7c278a59 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -53,6 +53,7 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client exists, err := h.exists(ctx, avn, topic) if err != nil { + meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("CheckExists", err)) return err } @@ -66,6 +67,7 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client Config: convertKafkaTopicConfig(topic), }) if err != nil && !aiven.IsAlreadyExists(err) { + meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Create", err)) return err } @@ -79,6 +81,7 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client Config: convertKafkaTopicConfig(topic), }) if err != nil { + meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Update", err)) return fmt.Errorf("cannot update Kafka Topic: %w", err) } @@ -112,6 +115,7 @@ func (h KafkaTopicHandler) delete(ctx context.Context, avn *aiven.Client, obj cl // Delete project on Aiven side err = avn.KafkaTopics.Delete(ctx, topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName()) if err != nil && !aiven.IsNotFound(err) { + meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Delete", err)) return false, err } @@ -168,7 +172,11 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl meta.SetStatusCondition(&topic.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(ctx, avn, topic.Spec.Project, topic.Spec.ServiceName) + running, err := checkServiceIsRunning(ctx, avn, topic.Spec.Project, topic.Spec.ServiceName) + if err != nil { + meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Preconditions", err)) + } + return running, err } func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) { diff --git a/controllers/project_controller.go b/controllers/project_controller.go index 8096c20f..fc1be4ff 100644 --- a/controllers/project_controller.go +++ b/controllers/project_controller.go @@ -77,6 +77,7 @@ func (h ProjectHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, o exists, err := h.exists(ctx, avn, project) if err != nil { + meta.SetStatusCondition(&project.Status.Conditions, getErrorCondition("CheckExists", err)) return fmt.Errorf("project does not exists: %w", err) }