Skip to content

Commit

Permalink
feat: implement error-conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
kaessert authored and byashimov committed Nov 21, 2023
1 parent f629b02 commit 3cffe03
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 22 deletions.
40 changes: 22 additions & 18 deletions controllers/basic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
eventWaitingForPreconditions = "WaitingForPreconditions"
eventUnableToWaitForPreconditions = "UnableToWaitForPreconditions"
eventPreconditionsAreMet = "PreconditionsAreMet"
eventPreconditionsNotMet = "PreconditionsNotMet"
eventUnableToCreateOrUpdateAtAiven = "UnableToCreateOrUpdateAtAiven"
eventCreateOrUpdatedAtAiven = "CreateOrUpdatedAtAiven"
eventCreatedOrUpdatedAtAiven = "CreatedOrUpdatedAtAiven"
Expand Down Expand Up @@ -166,6 +167,26 @@ func (i instanceReconcilerHelper) reconcileInstance(ctx context.Context, o clien
i.log.Info("reconciling instance")
i.rec.Event(o, corev1.EventTypeNormal, eventReconciliationStarted, "starting reconciliation")

var err error

defer func() {
// Order matters.
// First need to update the object, and then update the status.
// So dependent resources won't see READY before it has been updated with new values
// Clone is used so update won't overwrite in-memory values
// We need to this on outer level because we otherwise wouldn't set status-conditions
// properly when preconditions fail
clone := o.DeepCopyObject().(client.Object)
err = multierror.Append(err, i.k8s.Update(ctx, clone))

// Original object has been updated
o.SetResourceVersion(clone.GetResourceVersion())

// It's ready to cast its status
err = multierror.Append(err, i.k8s.Status().Update(ctx, o))
err = err.(*multierror.Error).ErrorOrNil()
}()

if isMarkedForDeletion(o) {
if controllerutil.ContainsFinalizer(o, instanceDeletionFinalizer) {
return i.finalize(ctx, o)
Expand Down Expand Up @@ -269,6 +290,7 @@ func (i instanceReconcilerHelper) checkPreconditions(ctx context.Context, o clie
}

if !check {
i.rec.Event(o, corev1.EventTypeNormal, eventPreconditionsNotMet, "preconditions are not met, requeue")
i.log.Info("preconditions are not met, requeue")
return true, nil
}
Expand Down Expand Up @@ -411,26 +433,8 @@ func (i instanceReconcilerHelper) createOrUpdateInstance(ctx context.Context, o
}

func (i instanceReconcilerHelper) updateInstanceStateAndSecretUntilRunning(ctx context.Context, o client.Object) (bool, error) {
var err error

i.log.Info("checking if instance is ready")

defer func() {
// Order matters.
// First need to update the object, and then update the status.
// So dependent resources won't see READY before it has been updated with new values
// Clone is used so update won't overwrite in-memory values
clone := o.DeepCopyObject().(client.Object)
err = multierror.Append(err, i.k8s.Update(ctx, clone))

// Original object has been updated
o.SetResourceVersion(clone.GetResourceVersion())

// It's ready to cast its status
err = multierror.Append(err, i.k8s.Status().Update(ctx, o))
err = err.(*multierror.Error).ErrorOrNil()
}()

serviceSecret, err := i.h.get(ctx, i.avn, o)
if err != nil {
return false, err
Expand Down
11 changes: 11 additions & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
const (
conditionTypeRunning = "Running"
conditionTypeInitialized = "Initialized"
conditionTypeError = "Error"

secretProtectionFinalizer = "finalizers.aiven.io/needed-to-delete-services"
instanceDeletionFinalizer = "finalizers.aiven.io/delete-remote-resource"
Expand All @@ -44,6 +45,7 @@ func checkServiceIsRunning(ctx context.Context, c *aiven.Client, project, servic
if err != nil {
// if service is not found, it is not running
if aiven.IsNotFound(err) {
// this will swallow an error if the project doesn't exist and object is not project
return false, nil
}
return false, err
Expand All @@ -69,6 +71,15 @@ func getRunningCondition(status metav1.ConditionStatus, reason, message string)
}
}

func getErrorCondition(reason string, err error) metav1.Condition {
return metav1.Condition{
Type: conditionTypeError,
Status: metav1.ConditionUnknown,
Reason: reason,
Message: err.Error(),
}
}

func isMarkedForDeletion(o client.Object) bool {
return !o.GetDeletionTimestamp().IsZero()
}
Expand Down
5 changes: 5 additions & 0 deletions controllers/connectionpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl

exists, err := h.exists(ctx, avn, cp)
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("CheckExists", err))
return err
}
var reason string
Expand All @@ -61,6 +62,7 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl
Username: optionalStringPointer(cp.Spec.Username),
})
if err != nil && !aiven.IsAlreadyExists(err) {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Create", err))
return err
}
reason = "Created"
Expand All @@ -73,6 +75,7 @@ func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl
Username: optionalStringPointer(cp.Spec.Username),
})
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Update", err))
return err
}
reason = "Updated"
Expand Down Expand Up @@ -100,6 +103,7 @@ func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, ob

err = avn.ConnectionPools.Delete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name)
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Delete", err))
return false, err
}

Expand Down Expand Up @@ -200,6 +204,7 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive

check, err := checkServiceIsRunning(ctx, avn, cp.Spec.Project, cp.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&cp.Status.Conditions, getErrorCondition("Preconditions", err))
return false, err
}

Expand Down
9 changes: 8 additions & 1 deletion controllers/database_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
exists, err := h.exists(ctx, avn, db)

if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CheckExists", err))
return err
}

Expand All @@ -57,6 +58,7 @@ func (h DatabaseHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
LcType: db.Spec.LcCtype,
})
if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("CreateOrUpdate", err))
return fmt.Errorf("cannot create database on Aiven side: %w", err)
}
}
Expand Down Expand Up @@ -91,6 +93,7 @@ func (h DatabaseHandler) delete(ctx context.Context, avn *aiven.Client, obj clie
db.Spec.ServiceName,
db.Name)
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Delete", err))
return false, err
}

Expand Down Expand Up @@ -135,7 +138,11 @@ func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&db.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName)
running, err := checkServiceIsRunning(ctx, avn, db.Spec.Project, db.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&db.Status.Conditions, getErrorCondition("Preconditions", err))
}
return running, err
}

func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) {
Expand Down
9 changes: 9 additions & 0 deletions controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
_, err = avn.Services.Get(ctx, spec.Project, ometa.Name)
exists := err == nil
if !exists && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("CheckExists", err))
return fmt.Errorf("failed to fetch service: %w", err)
}

Expand Down Expand Up @@ -80,12 +81,14 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C

_, err = avn.Services.Create(ctx, spec.Project, req)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Create", err))
return fmt.Errorf("failed to create service: %w", err)
}
} else {
reason = "Updated"
userConfig, err := UpdateUserConfiguration(o.getUserConfig())
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err))
return err
}

Expand All @@ -101,6 +104,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
}
_, err = avn.Services.Update(ctx, spec.Project, ometa.Name, req)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err))
return fmt.Errorf("failed to update service: %w", err)
}
}
Expand All @@ -116,6 +120,7 @@ func (h *genericServiceHandler) createOrUpdate(ctx context.Context, avn *aiven.C
}
_, err = avn.ServiceTags.Set(ctx, spec.Project, ometa.Name, req)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Update", err))
return fmt.Errorf("failed to update tags: %w", err)
}

Expand Down Expand Up @@ -149,6 +154,7 @@ func (h *genericServiceHandler) delete(ctx context.Context, avn *aiven.Client, o
return true, nil
}

meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Delete", err))
return false, fmt.Errorf("failed to delete service in Aiven: %w", err)
}

Expand Down Expand Up @@ -191,6 +197,9 @@ func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiv
// If not, the wrapper controller will try later
if s.IntegrationType == "read_replica" {
r, err := checkServiceIsRunning(ctx, avn, spec.Project, s.SourceServiceName)
if err != nil {
meta.SetStatusCondition(&o.getServiceStatus().Conditions, getErrorCondition("Preconditions", err))
}
if !(r && err == nil) {
return false, nil
}
Expand Down
7 changes: 6 additions & 1 deletion controllers/kafkaacl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (h KafkaACLHandler) createOrUpdate(ctx context.Context, avn *aiven.Client,
// Tries to delete it instead
_, err = h.delete(ctx, avn, obj)
if err != nil {
meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err))
return err
}

Expand Down Expand Up @@ -160,7 +161,11 @@ func (h KafkaACLHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&acl.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName)
running, err := checkServiceIsRunning(ctx, avn, acl.Spec.Project, acl.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&acl.Status.Conditions, getErrorCondition("Delete", err))
}
return running, err
}

func (h KafkaACLHandler) convert(i client.Object) (*v1alpha1.KafkaACL, error) {
Expand Down
11 changes: 10 additions & 1 deletion controllers/kafkaconnector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,28 @@ func (h KafkaConnectorHandler) createOrUpdate(ctx context.Context, avn *aiven.Cl

exists, err := h.exists(ctx, avn, conn)
if err != nil {
meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("CheckExists", err))
return fmt.Errorf("unable to check if kafka connector exists: %w", err)
}

connCfg, err := h.buildConnectorConfig(conn)
if err != nil {
meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("BuildConfig", err))
return fmt.Errorf("unable to build connector config: %w", err)
}

var reason string
if !exists {
err = avn.KafkaConnectors.Create(ctx, conn.Spec.Project, conn.Spec.ServiceName, connCfg)
if err != nil && !aiven.IsAlreadyExists(err) {
meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Create", err))
return err
}
reason = "Created"
} else {
_, err := avn.KafkaConnectors.Update(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name, connCfg)
if err != nil {
meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Update", err))
return err
}
reason = "Updated"
Expand Down Expand Up @@ -141,6 +145,7 @@ func (h KafkaConnectorHandler) delete(ctx context.Context, avn *aiven.Client, ob
}
err = avn.KafkaConnectors.Delete(ctx, conn.Spec.Project, conn.Spec.ServiceName, conn.Name)
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Delete", err))
return false, fmt.Errorf("unable to delete kafka connector: %w", err)
}
return true, nil
Expand Down Expand Up @@ -215,7 +220,11 @@ func (h KafkaConnectorHandler) checkPreconditions(ctx context.Context, avn *aive
meta.SetStatusCondition(&conn.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, conn.Spec.Project, conn.Spec.ServiceName)
running, err := checkServiceIsRunning(ctx, avn, conn.Spec.Project, conn.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&conn.Status.Conditions, getErrorCondition("Preconditions", err))
}
return running, err
}

func (h KafkaConnectorHandler) convert(o client.Object) (*v1alpha1.KafkaConnector, error) {
Expand Down
10 changes: 9 additions & 1 deletion controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client

exists, err := h.exists(ctx, avn, topic)
if err != nil {
meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("CheckExists", err))
return err
}

Expand All @@ -66,6 +67,7 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client
Config: convertKafkaTopicConfig(topic),
})
if err != nil && !aiven.IsAlreadyExists(err) {
meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Create", err))
return err
}

Expand All @@ -79,6 +81,7 @@ func (h KafkaTopicHandler) createOrUpdate(ctx context.Context, avn *aiven.Client
Config: convertKafkaTopicConfig(topic),
})
if err != nil {
meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Update", err))
return fmt.Errorf("cannot update Kafka Topic: %w", err)
}

Expand Down Expand Up @@ -112,6 +115,7 @@ func (h KafkaTopicHandler) delete(ctx context.Context, avn *aiven.Client, obj cl
// Delete project on Aiven side
err = avn.KafkaTopics.Delete(ctx, topic.Spec.Project, topic.Spec.ServiceName, topic.GetTopicName())
if err != nil && !aiven.IsNotFound(err) {
meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Delete", err))
return false, err
}

Expand Down Expand Up @@ -168,7 +172,11 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl
meta.SetStatusCondition(&topic.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, topic.Spec.Project, topic.Spec.ServiceName)
running, err := checkServiceIsRunning(ctx, avn, topic.Spec.Project, topic.Spec.ServiceName)
if err != nil {
meta.SetStatusCondition(&topic.Status.Conditions, getErrorCondition("Preconditions", err))
}
return running, err
}

func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) {
Expand Down
1 change: 1 addition & 0 deletions controllers/project_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (h ProjectHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, o

exists, err := h.exists(ctx, avn, project)
if err != nil {
meta.SetStatusCondition(&project.Status.Conditions, getErrorCondition("CheckExists", err))
return fmt.Errorf("project does not exists: %w", err)
}

Expand Down

0 comments on commit 3cffe03

Please sign in to comment.