diff --git a/CHANGELOG.md b/CHANGELOG.md index 488f2d6e..60779085 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD +- Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes + ## v0.24.0 - 2024-07-16 - Fix `PostgreSQL`: wait for a valid backup to create read replica diff --git a/controllers/common.go b/controllers/common.go index bc2a22ab..bf70d5a3 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -58,16 +58,18 @@ func checkServiceIsRunning(ctx context.Context, _ *aiven.Client, avnGen avngen.C } return false, err } - return serviceIsRunning(s.State), nil + return serviceIsOperational(s.State), nil } -// serviceIsRunning returns "true" when a service is in operational state, i.e. "running" +// serviceIsOperational returns "true" when a service is in operational state, i.e. "running" +func serviceIsOperational[T service.ServiceStateType | string](state T) bool { + s := service.ServiceStateType(state) + return s == service.ServiceStateTypeRebalancing || serviceIsRunning(s) +} + +// serviceIsRunning returns "true" when a service is RUNNING state on Aive side func serviceIsRunning[T service.ServiceStateType | string](state T) bool { - switch service.ServiceStateType(state) { - case service.ServiceStateTypeRunning, service.ServiceStateTypeRebalancing: - return true - } - return false + return service.ServiceStateType(state) == service.ServiceStateTypeRunning } func getInitializedCondition(reason, message string) metav1.Condition { diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index 86083829..4284ec34 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -9,6 +9,7 @@ import ( "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -174,7 +175,24 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl meta.SetStatusCondition(&topic.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - return checkServiceIsRunning(ctx, avn, avnGen, topic.Spec.Project, topic.Spec.ServiceName) + s, err := avnGen.ServiceGet(ctx, topic.Spec.Project, topic.Spec.ServiceName) + if isNotFound(err) { + return false, nil + } + + if err != nil { + return false, err + } + + running := 0 + for _, node := range s.NodeStates { + if node.State == service.NodeStateTypeRunning { + running++ + } + } + // Replication factor requires enough nodes running. + // But we want to get the backend validation error if the value is too high + return running >= min(len(s.NodeStates), topic.Spec.Replication), nil } func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) {