From 199ebfb5da2b575fea6b0b2fcf0f958753b1de59 Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Wed, 24 Jul 2024 15:02:22 +0200 Subject: [PATCH] fix(kafkaschema): poll 404 --- CHANGELOG.md | 1 + api/v1alpha1/kafkaschema_types.go | 4 ++ .../templates/aiven.io_kafkaschemas.yaml | 7 ++ config/crd/bases/aiven.io_kafkaschemas.yaml | 7 ++ controllers/common.go | 26 +++++++ controllers/kafkaschema_controller.go | 72 +++++++++---------- controllers/kafkatopic_controller.go | 18 +---- docs/docs/api-reference/kafkaschema.md | 1 + go.mod | 1 + go.sum | 2 + 10 files changed, 84 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63eb9db9..a6da66b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes - Fix `ServiceIntegration`: sends empty source and destination projects +- Add `KafkaSchema` field `schemaType`, type `string`: Schema type ## v0.24.0 - 2024-07-16 diff --git a/api/v1alpha1/kafkaschema_types.go b/api/v1alpha1/kafkaschema_types.go index aa958d45..f9ac409f 100644 --- a/api/v1alpha1/kafkaschema_types.go +++ b/api/v1alpha1/kafkaschema_types.go @@ -17,6 +17,10 @@ type KafkaSchemaSpec struct { // Kafka Schema configuration should be a valid Avro Schema JSON format Schema string `json:"schema"` + // +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF + // Schema type + SchemaType string `json:"schemaType,omitempty"` + // +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE // Kafka Schemas compatibility level CompatibilityLevel string `json:"compatibilityLevel,omitempty"` diff --git a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml +++ b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/config/crd/bases/aiven.io_kafkaschemas.yaml b/config/crd/bases/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/config/crd/bases/aiven.io_kafkaschemas.yaml +++ b/config/crd/bases/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/controllers/common.go b/controllers/common.go index 4e102c90..c7df9473 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -72,6 +72,32 @@ func serviceIsRunning[T service.ServiceStateType | string](state T) bool { return service.ServiceStateType(state) == service.ServiceStateTypeRunning } +const allNodesRunning = -1 + +func minNodesRunning(ctx context.Context, avnGen avngen.Client, project, serviceName string, nodesRunning int) (bool, error) { + s, err := avnGen.ServiceGet(ctx, project, serviceName) + if isNotFound(err) { + return false, nil + } + + if err != nil { + return false, err + } + + running := 0 + for _, node := range s.NodeStates { + if node.State == service.NodeStateTypeRunning { + running++ + } + } + + if nodesRunning == allNodesRunning { + nodesRunning = len(s.NodeStates) + } + + return running >= min(len(s.NodeStates), nodesRunning), nil +} + func getInitializedCondition(reason, message string) metav1.Condition { return metav1.Condition{ Type: conditionTypeInitialized, diff --git a/controllers/kafkaschema_controller.go b/controllers/kafkaschema_controller.go index cd4d3cc2..cf78662f 100644 --- a/controllers/kafkaschema_controller.go +++ b/controllers/kafkaschema_controller.go @@ -6,9 +6,13 @@ import ( "context" "fmt" "strconv" + "time" "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" + "github.com/avast/retry-go" + "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,20 +47,20 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error { +func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error { schema, err := h.convert(obj) if err != nil { return err } - // createOrUpdate Kafka Schema Subject - _, err = avn.KafkaSubjectSchemas.Add( + _, err = avnGen.ServiceSchemaRegistrySubjectVersionPost( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - aiven.KafkaSchemaSubject{ - Schema: schema.Spec.Schema, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{ + Schema: schema.Spec.Schema, + SchemaType: kafkaschemaregistry.SchemaType(schema.Spec.SchemaType), }, ) if err != nil { @@ -65,26 +69,42 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien // set compatibility level if defined for a newly created Kafka Schema Subject if schema.Spec.CompatibilityLevel != "" { - _, err := avn.KafkaSubjectSchemas.UpdateConfiguration( + _, err := avnGen.ServiceSchemaRegistrySubjectConfigPut( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - schema.Spec.CompatibilityLevel, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{ + Compatibility: kafkaschemaregistry.CompatibilityType(schema.Spec.CompatibilityLevel), + }, ) if err != nil { return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err) } } - // get last version - version, err := h.getLastVersion(ctx, avn, schema) + // Gets the last version + const ( + pollDelay = 5 * time.Second + pollAttempts = 10 + ) + err = retry.Do( + func() error { + versions, err := avnGen.ServiceSchemaRegistrySubjectVersionsGet(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) + if err == nil && len(versions) > 0 { + schema.Status.Version = slices.Max(versions) + } + return err + }, + retry.Context(ctx), + retry.RetryIf(isNotFound), + retry.Delay(pollDelay), + retry.Attempts(pollAttempts), + ) if err != nil { return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err) } - schema.Status.Version = version - meta.SetStatusCondition(&schema.Status.Conditions, getInitializedCondition("Added", "Successfully created or updated the instance in Aiven")) @@ -105,12 +125,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe return false, err } - err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) - if err != nil && !isNotFound(err) { - return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err) - } - - return true, nil + err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) + return isDeleted(err) } func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) { @@ -134,7 +150,7 @@ func (h KafkaSchemaHandler) checkPreconditions(ctx context.Context, avn *aiven.C return false, err } - return checkServiceIsRunning(ctx, avn, avnGen, schema.Spec.Project, schema.Spec.ServiceName) + return minNodesRunning(ctx, avnGen, schema.Spec.Project, schema.Spec.ServiceName, allNodesRunning) } func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, error) { @@ -145,23 +161,3 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err return schema, nil } - -func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) { - ver, err := avn.KafkaSubjectSchemas.GetVersions( - ctx, - schema.Spec.Project, - schema.Spec.ServiceName, - schema.Spec.SubjectName) - if err != nil { - return 0, err - } - - var latestVersion int - for _, v := range ver.Versions { - if v > latestVersion { - latestVersion = v - } - } - - return latestVersion, nil -} diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index 4284ec34..121a0694 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -9,7 +9,6 @@ import ( "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" - "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -175,24 +174,9 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl meta.SetStatusCondition(&topic.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - s, err := avnGen.ServiceGet(ctx, topic.Spec.Project, topic.Spec.ServiceName) - if isNotFound(err) { - return false, nil - } - - if err != nil { - return false, err - } - - running := 0 - for _, node := range s.NodeStates { - if node.State == service.NodeStateTypeRunning { - running++ - } - } // Replication factor requires enough nodes running. // But we want to get the backend validation error if the value is too high - return running >= min(len(s.NodeStates), topic.Spec.Replication), nil + return minNodesRunning(ctx, avnGen, topic.Spec.Project, topic.Spec.ServiceName, topic.Spec.Replication) } func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) { diff --git a/docs/docs/api-reference/kafkaschema.md b/docs/docs/api-reference/kafkaschema.md index f2ac1a57..537b10a3 100644 --- a/docs/docs/api-reference/kafkaschema.md +++ b/docs/docs/api-reference/kafkaschema.md @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema. - [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef). - [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level. +- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type. ## authSecretRef {: #spec.authSecretRef } diff --git a/go.mod b/go.mod index f4342998..e9e261f7 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( require ( github.com/ClickHouse/ch-go v0.61.5 // indirect github.com/andybalholm/brotli v1.1.0 // indirect + github.com/avast/retry-go v3.0.0+incompatible // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index bdb36047..6015f167 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=