diff --git a/CHANGELOG.md b/CHANGELOG.md index b81c4ce5..3ed77c02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes - Fix `ServiceIntegration`: sends empty source and destination projects +- Fix `KafkaSchema`: poll resource availability +- Add `KafkaSchema` field `schemaType`, type `string`: Schema type - Add `Kafka` field `userConfig.follower_fetching`, type `object`: Enable follower fetching - Change `Kafka` field `userConfig.kafka.sasl_oauthbearer_sub_claim_name`: pattern ~~`^[^\r\n]*$`~~ → `^[^\r\n]*\S[^\r\n]*$` diff --git a/api/v1alpha1/kafkaschema_types.go b/api/v1alpha1/kafkaschema_types.go index aa958d45..3e16972d 100644 --- a/api/v1alpha1/kafkaschema_types.go +++ b/api/v1alpha1/kafkaschema_types.go @@ -3,6 +3,7 @@ package v1alpha1 import ( + "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -17,9 +18,13 @@ type KafkaSchemaSpec struct { // Kafka Schema configuration should be a valid Avro Schema JSON format Schema string `json:"schema"` + // +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF + // Schema type + SchemaType kafkaschemaregistry.SchemaType `json:"schemaType,omitempty"` + // +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE // Kafka Schemas compatibility level - CompatibilityLevel string `json:"compatibilityLevel,omitempty"` + CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"` } // KafkaSchemaStatus defines the observed state of KafkaSchema diff --git a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml +++ b/charts/aiven-operator-crds/templates/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/config/crd/bases/aiven.io_kafkaschemas.yaml b/config/crd/bases/aiven.io_kafkaschemas.yaml index d227a408..5e34946f 100644 --- a/config/crd/bases/aiven.io_kafkaschemas.yaml +++ b/config/crd/bases/aiven.io_kafkaschemas.yaml @@ -92,6 +92,13 @@ spec: Kafka Schema configuration should be a valid Avro Schema JSON format type: string + schemaType: + description: Schema type + enum: + - AVRO + - JSON + - PROTOBUF + type: string serviceName: description: Specifies the name of the service that this resource diff --git a/controllers/kafkaschema_controller.go b/controllers/kafkaschema_controller.go index d103a2d8..f18f44a0 100644 --- a/controllers/kafkaschema_controller.go +++ b/controllers/kafkaschema_controller.go @@ -6,9 +6,12 @@ import ( "context" "fmt" "strconv" + "time" "github.com/aiven/aiven-go-client/v2" avngen "github.com/aiven/go-client-codegen" + "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" + "github.com/avast/retry-go" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,20 +46,32 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error { +func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error { schema, err := h.convert(obj) if err != nil { return err } - // createOrUpdate Kafka Schema Subject - _, err = avn.KafkaSubjectSchemas.Add( + // Must poll kafka until the registry ready. + // The client retries errors under the hood. + _, err = avnGen.ServiceSchemaRegistrySubjectVersionsGet( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - aiven.KafkaSchemaSubject{ - Schema: schema.Spec.Schema, + ) + if err != nil && !isNotFound(err) { + return err + } + + schemaID, err := avnGen.ServiceSchemaRegistrySubjectVersionPost( + ctx, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{ + Schema: schema.Spec.Schema, + SchemaType: schema.Spec.SchemaType, }, ) if err != nil { @@ -65,26 +80,49 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien // set compatibility level if defined for a newly created Kafka Schema Subject if schema.Spec.CompatibilityLevel != "" { - _, err := avn.KafkaSubjectSchemas.UpdateConfiguration( + _, err := avnGen.ServiceSchemaRegistrySubjectConfigPut( ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, - schema.Spec.CompatibilityLevel, + &kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{ + Compatibility: schema.Spec.CompatibilityLevel, + }, ) if err != nil { return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err) } } - // get last version - version, err := h.getLastVersion(ctx, avn, schema) + // Gets the last version + // Because of eventual consistency, we must poll the subject + const ( + pollDelay = 5 * time.Second + pollAttempts = 10 + ) + + err = retry.Do( + func() error { + version, err := getKafkaSchemaVersion( + ctx, + avnGen, + schemaID, + schema.Spec.Project, + schema.Spec.ServiceName, + schema.Spec.SubjectName, + ) + schema.Status.Version = version + return err + }, + retry.Context(ctx), + retry.RetryIf(isNotFound), + retry.Delay(pollDelay), + retry.Attempts(pollAttempts), + ) if err != nil { return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err) } - schema.Status.Version = version - meta.SetStatusCondition(&schema.Status.Conditions, getInitializedCondition("Added", "Successfully created or updated the instance in Aiven")) @@ -105,12 +143,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe return false, err } - err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) - if err != nil && !isNotFound(err) { - return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err) - } - - return true, nil + err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName) + return isDeleted(err) } func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) { @@ -146,22 +180,22 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err return schema, nil } -func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) { - ver, err := avn.KafkaSubjectSchemas.GetVersions( - ctx, - schema.Spec.Project, - schema.Spec.ServiceName, - schema.Spec.SubjectName) +func getKafkaSchemaVersion(ctx context.Context, client avngen.Client, schemaID int, projectName, serviceName, subjectName string) (int, error) { + versions, err := client.ServiceSchemaRegistrySubjectVersionsGet(ctx, projectName, serviceName, subjectName) if err != nil { return 0, err } - var latestVersion int - for _, v := range ver.Versions { - if v > latestVersion { - latestVersion = v + for _, v := range versions { + version, err := client.ServiceSchemaRegistrySubjectVersionGet(ctx, projectName, serviceName, subjectName, v) + if err != nil { + return 0, err + } + + if version.Id == schemaID { + return version.Version, nil } } - return latestVersion, nil + return 0, NewNotFound(fmt.Sprintf("the schema with id %d not found", schemaID)) } diff --git a/docs/docs/api-reference/kafkaschema.md b/docs/docs/api-reference/kafkaschema.md index f2ac1a57..537b10a3 100644 --- a/docs/docs/api-reference/kafkaschema.md +++ b/docs/docs/api-reference/kafkaschema.md @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema. - [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef). - [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level. +- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type. ## authSecretRef {: #spec.authSecretRef } diff --git a/go.mod b/go.mod index e3f071dc..3e1f7453 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/aiven/aiven-go-client/v2 v2.26.0 github.com/aiven/go-api-schemas v1.85.0 github.com/aiven/go-client-codegen v0.31.0 + github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.1 github.com/docker/go-units v0.5.0 github.com/ghodss/yaml v1.0.0 diff --git a/go.sum b/go.sum index 155bb06f..c3080a6a 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/tests/kafkaschema_test.go b/tests/kafkaschema_test.go index 80f4d46b..c6ca4e8d 100644 --- a/tests/kafkaschema_test.go +++ b/tests/kafkaschema_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/aiven/go-client-codegen/handler/kafkaschemaregistry" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -43,6 +44,7 @@ spec: project: %[1]s serviceName: %[2]s subjectName: %[4]s + schemaType: AVRO compatibilityLevel: BACKWARD schema: | { @@ -104,7 +106,8 @@ func TestKafkaSchema(t *testing.T) { assert.Equal(t, schemaName, schema.Name) assert.Equal(t, subjectName, schema.Spec.SubjectName) assert.Equal(t, kafkaName, schema.Spec.ServiceName) - assert.Equal(t, "BACKWARD", schema.Spec.CompatibilityLevel) + assert.Equal(t, kafkaschemaregistry.SchemaTypeAvro, schema.Spec.SchemaType) + assert.Equal(t, kafkaschemaregistry.CompatibilityTypeBackward, schema.Spec.CompatibilityLevel) type schemaType struct { Default any `json:"default,omitempty"`