From 8958a3973173e219cbb84f8065cad3d94caa2782 Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Wed, 24 Jul 2024 15:02:22 +0200 Subject: [PATCH] fix(kafkaschema): poll schema version --- CHANGELOG.md | 1 + api/v1alpha1/kafkaschema_types.go | 4 + .../templates/aiven.io_kafkaschemas.yaml | 7 ++ config/crd/bases/aiven.io_kafkaschemas.yaml | 7 ++ controllers/common.go | 26 ++++++ controllers/kafkaschema_controller.go | 85 +++++++++++-------- controllers/kafkatopic_controller.go | 18 +--- docs/docs/api-reference/kafkaschema.md | 1 + go.mod | 3 +- go.sum | 6 +- 10 files changed, 101 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63eb9db9..a6da66b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes - Fix `ServiceIntegration`: sends empty source and destination projects +- Add `KafkaSchema` field `schemaType`, type `string`: Schema type ## v0.24.0 - 2024-07-16 diff --git a/api/v1alpha1/kafkaschema_types.go b/api/v1alpha1/kafkaschema_types.go index aa958d45..f9ac409f 100644 --- a/api/v1alpha1/kafkaschema_types.go +++ b/api/v1alpha1/kafkaschema_types.go @@ -17,6 +17,10 @@ type KafkaSchemaSpec struct { // Kafka Schema configuration should be a valid Avro Schema JSON format Schema string `json:"schema"` + // +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF + // Schema type + SchemaType string `json:"schemaType,omitempty"` + // +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE // Kafka Schemas compatibility level CompatibilityLevel string `json:"compatibilityLevel,omitempty"` diff --git a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml +++ b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/config/crd/bases/aiven.io_kafkaschemas.yaml b/config/crd/bases/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/config/crd/bases/aiven.io_kafkaschemas.yaml +++ b/config/crd/bases/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/controllers/common.go b/controllers/common.go index 6d2c50ea..770cb3ab 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -72,6 +72,32 @@ func serviceIsRunning[T service.ServiceStateType | string](state T) bool { return service.ServiceStateType(state) == service.ServiceStateTypeRunning } +const allNodesRunning = -1 + +func minNodesRunning(ctx context.Context, avnGen avngen.Client, project, serviceName string, nodesRunning int) (bool, error) { + s, err := avnGen.ServiceGet(ctx, project, serviceName) + if isNotFound(err) { + return false, nil + } + + if err != nil { + return false, err + } + + running := 0 + for _, node := range s.NodeStates { + if node.State == service.NodeStateTypeRunning { + running++ + } + } + + if nodesRunning == allNodesRunning { + nodesRunning = len(s.NodeStates) + } + + return running >= min(len(s.NodeStates), nodesRunning), nil +} + func getInitializedCondition(reason, message string) metav1.Condition { return metav1.Condition{ Type: conditionTypeInitialized, diff --git a/controllers/kafkaschema_controller.go b/controllers/kafkaschema_controller.go index d103a2d8..2692a2c8 100644 --- a/controllers/kafkaschema_controller.go +++ b/controllers/kafkaschema_controller.go @@ -6,9 +6,12 @@ import ( "context" "fmt" "strconv" + "time" "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" + "github.com/avast/retry-go" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,20 +46,32 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error { +func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error { schema, err := h.convert(obj) if err != nil { return err } - // createOrUpdate Kafka Schema Subject - _, err = avn.KafkaSubjectSchemas.Add( + // By some reason it must call ServiceSchemaRegistrySubjectVersionsGet before ServiceSchemaRegistrySubjectVersionPost + // Otherwise ServiceSchemaRegistrySubjectVersionPost fails with an unknown error + _, err = avnGen.ServiceSchemaRegistrySubjectVersionsGet( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - aiven.KafkaSchemaSubject{ - Schema: schema.Spec.Schema, + ) + if err != nil && !isNotFound(err) { + return err + } + + versionID, err := avnGen.ServiceSchemaRegistrySubjectVersionPost( + ctx, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{ + Schema: schema.Spec.Schema, + SchemaType: kafkaschemaregistry.SchemaType(schema.Spec.SchemaType), }, ) if err != nil { @@ -65,26 +80,46 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien // set compatibility level if defined for a newly created Kafka Schema Subject if schema.Spec.CompatibilityLevel != "" { - _, err := avn.KafkaSubjectSchemas.UpdateConfiguration( + _, err := avnGen.ServiceSchemaRegistrySubjectConfigPut( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - schema.Spec.CompatibilityLevel, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{ + Compatibility: kafkaschemaregistry.CompatibilityType(schema.Spec.CompatibilityLevel), + }, ) if err != nil { return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err) } } - // get last version - version, err := h.getLastVersion(ctx, avn, schema) + // Gets the last version + // Because of eventual consistency, we must poll the subject + const ( + pollDelay = 5 * time.Second + pollAttempts = 10 + ) + err = retry.Do( + func() error { + version, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, versionID) + if err != nil { + return err + } + + schema.Status.Version = version.Version + return nil + }, + retry.Context(ctx), + retry.RetryIf(isNotFound), + retry.Delay(pollDelay), + retry.Attempts(pollAttempts), + ) + if err != nil { return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err) } - schema.Status.Version = version - meta.SetStatusCondition(&schema.Status.Conditions, getInitializedCondition("Added", "Successfully created or updated the instance in Aiven")) @@ -105,12 +140,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe return false, err } - err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) - if err != nil && !isNotFound(err) { - return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err) - } - - return true, nil + err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) + return isDeleted(err) } func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) { @@ -145,23 +176,3 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err return schema, nil } - -func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) { - ver, err := avn.KafkaSubjectSchemas.GetVersions( - ctx, - schema.Spec.Project, - schema.Spec.ServiceName, - schema.Spec.SubjectName) - if err != nil { - return 0, err - } - - var latestVersion int - for _, v := range ver.Versions { - if v > latestVersion { - latestVersion = v - } - } - - return latestVersion, nil -} diff --git a/controllers/kafkatopic_controller.go b/controllers/kafkatopic_controller.go index 4284ec34..121a0694 100644 --- a/controllers/kafkatopic_controller.go +++ b/controllers/kafkatopic_controller.go @@ -9,7 +9,6 @@ import ( "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" - "github.com/aiven/go-client-codegen/handler/service" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -175,24 +174,9 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl meta.SetStatusCondition(&topic.Status.Conditions, getInitializedCondition("Preconditions", "Checking preconditions")) - s, err := avnGen.ServiceGet(ctx, topic.Spec.Project, topic.Spec.ServiceName) - if isNotFound(err) { - return false, nil - } - - if err != nil { - return false, err - } - - running := 0 - for _, node := range s.NodeStates { - if node.State == service.NodeStateTypeRunning { - running++ - } - } // Replication factor requires enough nodes running. // But we want to get the backend validation error if the value is too high - return running >= min(len(s.NodeStates), topic.Spec.Replication), nil + return minNodesRunning(ctx, avnGen, topic.Spec.Project, topic.Spec.ServiceName, topic.Spec.Replication) } func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) { diff --git a/docs/docs/api-reference/kafkaschema.md b/docs/docs/api-reference/kafkaschema.md index f2ac1a57..537b10a3 100644 --- a/docs/docs/api-reference/kafkaschema.md +++ b/docs/docs/api-reference/kafkaschema.md @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema. - [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef). - [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level. +- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type. ## authSecretRef {: #spec.authSecretRef } diff --git a/go.mod b/go.mod index f4342998..59fccaa4 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,8 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.26.0 github.com/aiven/aiven-go-client/v2 v2.24.0 github.com/aiven/go-api-schemas v1.79.0 - github.com/aiven/go-client-codegen v0.18.0 + github.com/aiven/go-client-codegen v0.18.1-0.20240725111224-58ef232e2930 + github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.0 github.com/docker/go-units v0.5.0 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index bdb36047..d62df5ea 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,8 @@ github.com/aiven/aiven-go-client/v2 v2.24.0 h1:Ce9xXuTqs8EsjD0Vj67qhMnnYV/9UXzFL github.com/aiven/aiven-go-client/v2 v2.24.0/go.mod h1:KdHfLIlIRZIfCSEBd39j1Q81jlSb6Nd+oCQKqERfnuA= github.com/aiven/go-api-schemas v1.79.0 h1:V6H7XKbsgfwWWLBazj53ZiQygCZdMB9os2ZP5wvgzIw= github.com/aiven/go-api-schemas v1.79.0/go.mod h1:FYR22WcKLisZ1CYqyyGk6XqNSyxfAUtaQd+P2ydwc5A= -github.com/aiven/go-client-codegen v0.18.0 h1:Bpt/++DojYTFsgz0iMuYsQSa0TOQkZ8JSQBQLPvUQus= -github.com/aiven/go-client-codegen v0.18.0/go.mod h1:Sajbdpjn1/m5g2D6EDfiSnxl9pj9hxe8+hpG1CkCkhs= +github.com/aiven/go-client-codegen v0.18.1-0.20240725111224-58ef232e2930 h1:bTT29IMhYqpFM9fa5ZzE8RhgW4Zz5FBj3WxNK0bMrDI= +github.com/aiven/go-client-codegen v0.18.1-0.20240725111224-58ef232e2930/go.mod h1:Sajbdpjn1/m5g2D6EDfiSnxl9pj9hxe8+hpG1CkCkhs= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=