Skip to content

Commit

Permalink
fix(kafkatopic): wait nodes running
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Jul 22, 2024
1 parent a500263 commit 4f70c04
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes

## v0.24.0 - 2024-07-16

- Fix `PostgreSQL`: wait for a valid backup to create read replica
Expand Down
16 changes: 9 additions & 7 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,18 @@ func checkServiceIsRunning(ctx context.Context, _ *aiven.Client, avnGen avngen.C
}
return false, err
}
return serviceIsRunning(s.State), nil
return serviceIsOperational(s.State), nil
}

// serviceIsRunning returns "true" when a service is in operational state, i.e. "running"
// serviceIsOperational returns "true" when a service is in operational state, i.e. "running"
func serviceIsOperational[T service.ServiceStateType | string](state T) bool {
s := service.ServiceStateType(state)
return s == service.ServiceStateTypeRebalancing || serviceIsRunning(s)
}

// serviceIsRunning returns "true" when a service is RUNNING state on Aive side
func serviceIsRunning[T service.ServiceStateType | string](state T) bool {
switch service.ServiceStateType(state) {
case service.ServiceStateTypeRunning, service.ServiceStateTypeRebalancing:
return true
}
return false
return service.ServiceStateType(state) == service.ServiceStateTypeRunning
}

func getInitializedCondition(reason, message string) metav1.Condition {
Expand Down
20 changes: 19 additions & 1 deletion controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -174,7 +175,24 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl
meta.SetStatusCondition(&topic.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

return checkServiceIsRunning(ctx, avn, avnGen, topic.Spec.Project, topic.Spec.ServiceName)
s, err := avnGen.ServiceGet(ctx, topic.Spec.Project, topic.Spec.ServiceName)
if isNotFound(err) {
return false, nil
}

if err != nil {
return false, err
}

running := 0
for _, node := range s.NodeStates {
if node.State == service.NodeStateTypeRunning {
running++
}
}
// Replication factor requires enough nodes running.
// But we want to get the backend validation error if the value is too high
return running >= min(len(s.NodeStates), topic.Spec.Replication), nil
}

func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) {
Expand Down

0 comments on commit 4f70c04

Please sign in to comment.