Skip to content

Commit

Permalink
fix(kafkatopic): wait service running
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Jul 16, 2024
1 parent e401591 commit 7afde9e
Show file tree
Hide file tree
Showing 15 changed files with 32 additions and 21 deletions.
2 changes: 1 addition & 1 deletion controllers/clickhousedatabase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (h *ClickhouseDatabaseHandler) checkPreconditions(ctx context.Context, avn
meta.SetStatusCondition(&db.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, db.Spec.Project, db.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, db.Spec.Project, db.Spec.ServiceName)
}

func (h *ClickhouseDatabaseHandler) convert(i client.Object) (*v1alpha1.ClickhouseDatabase, error) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/clickhousegrant_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (h *ClickhouseGrantHandler) checkPreconditions(ctx context.Context, avn *ai
meta.SetStatusCondition(&g.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

serviceIsRunning, err := checkServiceIsRunning(ctx, avn, avnGen, g.Spec.Project, g.Spec.ServiceName)
serviceIsRunning, err := checkServiceIsRunning(ctx, avnGen, g.Spec.Project, g.Spec.ServiceName)
if !serviceIsRunning || err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/clickhouserole_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (h *clickhouseRoleHandler) checkPreconditions(ctx context.Context, avn *aiv
meta.SetStatusCondition(&role.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, role.Spec.Project, role.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, role.Spec.Project, role.Spec.ServiceName)
}

func (h *clickhouseRoleHandler) convert(i client.Object) (*v1alpha1.ClickhouseRole, error) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/clickhouseuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (h *clickhouseUserHandler) checkPreconditions(ctx context.Context, avn *aiv
meta.SetStatusCondition(&user.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, user.Spec.Project, user.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, user.Spec.Project, user.Spec.ServiceName)
}

func (h *clickhouseUserHandler) convert(i client.Object) (*v1alpha1.ClickhouseUser, error) {
Expand Down
8 changes: 4 additions & 4 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (

var errTerminationProtectionOn = errors.New("termination protection is on")

func checkServiceIsRunning(ctx context.Context, _ *aiven.Client, avnGen avngen.Client, project, serviceName string) (bool, error) {
func checkServiceIsRunning(ctx context.Context, avnGen avngen.Client, project, serviceName string) (bool, error) {
s, err := avnGen.ServiceGet(ctx, project, serviceName)
if err != nil {
// if service is not found, it is not running
Expand All @@ -58,11 +58,11 @@ func checkServiceIsRunning(ctx context.Context, _ *aiven.Client, avnGen avngen.C
}
return false, err
}
return serviceIsRunning(s.State), nil
return serviceIsOperational(s.State), nil
}

// serviceIsRunning returns "true" when a service is in operational state, i.e. "running"
func serviceIsRunning[T service.ServiceStateType | string](state T) bool {
// serviceIsOperational returns "true" when a service is in operational state, i.e. "running"
func serviceIsOperational[T service.ServiceStateType | string](state T) bool {
switch service.ServiceStateType(state) {
case service.ServiceStateTypeRunning, service.ServiceStateTypeRebalancing:
return true
Expand Down
2 changes: 1 addition & 1 deletion controllers/connectionpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive
meta.SetStatusCondition(&cp.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

isRunning, err := checkServiceIsRunning(ctx, avn, avnGen, cp.Spec.Project, cp.Spec.ServiceName)
isRunning, err := checkServiceIsRunning(ctx, avnGen, cp.Spec.Project, cp.Spec.ServiceName)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/database_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (h DatabaseHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&db.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, db.Spec.Project, db.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, db.Spec.Project, db.Spec.ServiceName)
}

func (h DatabaseHandler) convert(i client.Object) (*v1alpha1.Database, error) {
Expand Down
6 changes: 3 additions & 3 deletions controllers/generic_service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ func (h *genericServiceHandler) get(ctx context.Context, avn *aiven.Client, avnG
}

spec := o.getServiceCommonSpec()
s, err := avn.Services.Get(ctx, spec.Project, o.getObjectMeta().Name)
s, err := avn.Services.Get(ctx, spec.Project, o.GetName())
if err != nil {
return nil, fmt.Errorf("failed to get service from Aiven: %w", err)
}

status := o.getServiceStatus()
if !serviceIsRunning(s.State) {
if !serviceIsOperational(s.State) {
status.State = s.State
return nil, nil
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func (h *genericServiceHandler) checkPreconditions(ctx context.Context, avn *aiv
// Validates that read_replica is running
// If not, the wrapper controller will try later
if s.IntegrationType == "read_replica" {
r, err := checkServiceIsRunning(ctx, avn, avnGen, spec.Project, s.SourceServiceName)
r, err := checkServiceIsRunning(ctx, avnGen, spec.Project, s.SourceServiceName)
if !r || err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/kafkaacl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (h KafkaACLHandler) checkPreconditions(ctx context.Context, avn *aiven.Clie
meta.SetStatusCondition(&acl.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, acl.Spec.Project, acl.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, acl.Spec.Project, acl.Spec.ServiceName)
}

func (h KafkaACLHandler) convert(i client.Object) (*v1alpha1.KafkaACL, error) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/kafkaconnector_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (h KafkaConnectorHandler) checkPreconditions(ctx context.Context, avn *aive
meta.SetStatusCondition(&conn.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, conn.Spec.Project, conn.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, conn.Spec.Project, conn.Spec.ServiceName)
}

func (h KafkaConnectorHandler) convert(o client.Object) (*v1alpha1.KafkaConnector, error) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/kafkaschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (h KafkaSchemaHandler) checkPreconditions(ctx context.Context, avn *aiven.C
return false, err
}

return checkServiceIsRunning(ctx, avn, avnGen, schema.Spec.Project, schema.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, schema.Spec.Project, schema.Spec.ServiceName)
}

func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, error) {
Expand Down
2 changes: 1 addition & 1 deletion controllers/kafkaschemaregistryacl_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (h KafkaSchemaRegistryACLHandler) checkPreconditions(ctx context.Context, a
meta.SetStatusCondition(&acl.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, acl.Spec.Project, acl.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, acl.Spec.Project, acl.Spec.ServiceName)
}

func (h KafkaSchemaRegistryACLHandler) convert(i client.Object) (*v1alpha1.KafkaSchemaRegistryACL, error) {
Expand Down
13 changes: 12 additions & 1 deletion controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -174,7 +175,17 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl
meta.SetStatusCondition(&topic.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, topic.Spec.Project, topic.Spec.ServiceName)
if topic.Spec.Replication != 0 {
// Replication factor requires all nodes running
s, err := avnGen.ServiceGet(ctx, topic.Spec.Project, topic.Spec.ServiceName)
if isNotFound(err) {
return false, nil
}

return s.State == service.ServiceStateTypeRunning, nil
}

return checkServiceIsRunning(ctx, avnGen, topic.Spec.Project, topic.Spec.ServiceName)
}

func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) {
Expand Down
4 changes: 2 additions & 2 deletions controllers/serviceintegration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (h ServiceIntegrationHandler) checkPreconditions(ctx context.Context, avn *
if project == "" {
project = si.Spec.Project
}
running, err := checkServiceIsRunning(ctx, avn, avnGen, project, si.Spec.SourceServiceName)
running, err := checkServiceIsRunning(ctx, avnGen, project, si.Spec.SourceServiceName)
if !running || err != nil {
return false, err
}
Expand All @@ -185,7 +185,7 @@ func (h ServiceIntegrationHandler) checkPreconditions(ctx context.Context, avn *
if project == "" {
project = si.Spec.Project
}
running, err := checkServiceIsRunning(ctx, avn, avnGen, project, si.Spec.DestinationServiceName)
running, err := checkServiceIsRunning(ctx, avnGen, project, si.Spec.DestinationServiceName)
if !running || err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion controllers/serviceuser_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (h ServiceUserHandler) checkPreconditions(ctx context.Context, avn *aiven.C
meta.SetStatusCondition(&user.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, user.Spec.Project, user.Spec.ServiceName)
return checkServiceIsRunning(ctx, avnGen, user.Spec.Project, user.Spec.ServiceName)
}

func (h ServiceUserHandler) convert(i client.Object) (*v1alpha1.ServiceUser, error) {
Expand Down

0 comments on commit 7afde9e

Please sign in to comment.