diff --git a/CHANGELOG.md b/CHANGELOG.md index 63eb9db9..e5c66e8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes - Fix `ServiceIntegration`: sends empty source and destination projects +- Fix `KafkaSchema`: poll resource availability +- Add `KafkaSchema` field `schemaType`, type `string`: Schema type ## v0.24.0 - 2024-07-16 diff --git a/api/v1alpha1/kafkaschema_types.go b/api/v1alpha1/kafkaschema_types.go index aa958d45..f9ac409f 100644 --- a/api/v1alpha1/kafkaschema_types.go +++ b/api/v1alpha1/kafkaschema_types.go @@ -17,6 +17,10 @@ type KafkaSchemaSpec struct { // Kafka Schema configuration should be a valid Avro Schema JSON format Schema string `json:"schema"` + // +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF + // Schema type + SchemaType string `json:"schemaType,omitempty"` + // +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE // Kafka Schemas compatibility level CompatibilityLevel string `json:"compatibilityLevel,omitempty"` diff --git a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml +++ b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/config/crd/bases/aiven.io_kafkaschemas.yaml b/config/crd/bases/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/config/crd/bases/aiven.io_kafkaschemas.yaml +++ b/config/crd/bases/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/controllers/kafkaschema_controller.go b/controllers/kafkaschema_controller.go index d103a2d8..886f4633 100644 --- a/controllers/kafkaschema_controller.go +++ b/controllers/kafkaschema_controller.go @@ -6,9 +6,12 @@ import ( "context" "fmt" "strconv" + "time" "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" + "github.com/avast/retry-go" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,20 +46,31 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error { +func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error { schema, err := h.convert(obj) if err != nil { return err } - // createOrUpdate Kafka Schema Subject - _, err = avn.KafkaSubjectSchemas.Add( + // Must poll kafka until the registry ready + _, err = avnGen.ServiceSchemaRegistrySubjectVersionsGet( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - aiven.KafkaSchemaSubject{ - Schema: schema.Spec.Schema, + ) + if err != nil && !isNotFound(err) { + return err + } + + schemaID, err := avnGen.ServiceSchemaRegistrySubjectVersionPost( + ctx, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{ + Schema: schema.Spec.Schema, + SchemaType: kafkaschemaregistry.SchemaType(schema.Spec.SchemaType), }, ) if err != nil { @@ -65,26 +79,56 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien // set compatibility level if defined for a newly created Kafka Schema Subject if schema.Spec.CompatibilityLevel != "" { - _, err := avn.KafkaSubjectSchemas.UpdateConfiguration( + _, err := avnGen.ServiceSchemaRegistrySubjectConfigPut( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - schema.Spec.CompatibilityLevel, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{ + Compatibility: kafkaschemaregistry.CompatibilityType(schema.Spec.CompatibilityLevel), + }, ) if err != nil { return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err) } } - // get last version - version, err := h.getLastVersion(ctx, avn, schema) + // Gets the last version + // Because of eventual consistency, we must poll the subject + const ( + pollDelay = 5 * time.Second + pollAttempts = 10 + ) + + err = retry.Do( + func() error { + versions, err := avnGen.ServiceSchemaRegistrySubjectVersionsGet(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) + if err != nil { + return err + } + + for _, v := range versions { + version, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, v) + if err != nil { + return err + } + + if version.Id == schemaID { + schema.Status.Version = version.Version + return nil + } + } + return NewNotFound(fmt.Sprintf("the schema with id %d not found", schemaID)) + }, + retry.Context(ctx), + retry.RetryIf(isNotFound), + retry.Delay(pollDelay), + retry.Attempts(pollAttempts), + ) if err != nil { return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err) } - schema.Status.Version = version - meta.SetStatusCondition(&schema.Status.Conditions, getInitializedCondition("Added", "Successfully created or updated the instance in Aiven")) @@ -105,12 +149,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe return false, err } - err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) - if err != nil && !isNotFound(err) { - return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err) - } - - return true, nil + err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) + return isDeleted(err) } func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) { @@ -145,23 +185,3 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err return schema, nil } - -func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) { - ver, err := avn.KafkaSubjectSchemas.GetVersions( - ctx, - schema.Spec.Project, - schema.Spec.ServiceName, - schema.Spec.SubjectName) - if err != nil { - return 0, err - } - - var latestVersion int - for _, v := range ver.Versions { - if v > latestVersion { - latestVersion = v - } - } - - return latestVersion, nil -} diff --git a/docs/docs/api-reference/kafkaschema.md b/docs/docs/api-reference/kafkaschema.md index f2ac1a57..537b10a3 100644 --- a/docs/docs/api-reference/kafkaschema.md +++ b/docs/docs/api-reference/kafkaschema.md @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema. - [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef). - [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level. +- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type. ## authSecretRef {: #spec.authSecretRef } diff --git a/go.mod b/go.mod index f4342998..d3e5a4d9 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,8 @@ require ( github.com/ClickHouse/clickhouse-go/v2 v2.26.0 github.com/aiven/aiven-go-client/v2 v2.24.0 github.com/aiven/go-api-schemas v1.79.0 - github.com/aiven/go-client-codegen v0.18.0 + github.com/aiven/go-client-codegen v0.18.1-0.20240725162756-dcc0909158b6 + github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.0 github.com/docker/go-units v0.5.0 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index bdb36047..25f66778 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,8 @@ github.com/aiven/aiven-go-client/v2 v2.24.0 h1:Ce9xXuTqs8EsjD0Vj67qhMnnYV/9UXzFL github.com/aiven/aiven-go-client/v2 v2.24.0/go.mod h1:KdHfLIlIRZIfCSEBd39j1Q81jlSb6Nd+oCQKqERfnuA= github.com/aiven/go-api-schemas v1.79.0 h1:V6H7XKbsgfwWWLBazj53ZiQygCZdMB9os2ZP5wvgzIw= github.com/aiven/go-api-schemas v1.79.0/go.mod h1:FYR22WcKLisZ1CYqyyGk6XqNSyxfAUtaQd+P2ydwc5A= -github.com/aiven/go-client-codegen v0.18.0 h1:Bpt/++DojYTFsgz0iMuYsQSa0TOQkZ8JSQBQLPvUQus= -github.com/aiven/go-client-codegen v0.18.0/go.mod h1:Sajbdpjn1/m5g2D6EDfiSnxl9pj9hxe8+hpG1CkCkhs= +github.com/aiven/go-client-codegen v0.18.1-0.20240725162756-dcc0909158b6 h1:3I3iTg6aNp5JDLxmWj7ZLXI2McLxYpreK103Z7+ymbM= +github.com/aiven/go-client-codegen v0.18.1-0.20240725162756-dcc0909158b6/go.mod h1:Sajbdpjn1/m5g2D6EDfiSnxl9pj9hxe8+hpG1CkCkhs= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/tests/kafkaschema_test.go b/tests/kafkaschema_test.go index 7187b9a0..5d1efbff 100644 --- a/tests/kafkaschema_test.go +++ b/tests/kafkaschema_test.go @@ -76,7 +76,7 @@ func TestKafkaSchema(t *testing.T) { s := NewSession(ctx, k8sClient, cfg.Project) // Cleans test afterward - defer s.Destroy(t) + s.Destroy(t) // WHEN // Applies given manifest