Skip to content

Commit

Permalink
fix(kafkaschema): poll 404 (#797)
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov authored Sep 17, 2024
1 parent 3f5a72f commit 7e2cc9b
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes
- Fix `ServiceIntegration`: sends empty source and destination projects
- Fix `KafkaSchema`: poll resource availability
- Add `KafkaSchema` field `schemaType`, type `string`: Schema type
- Add `Kafka` field `userConfig.follower_fetching`, type `object`: Enable follower fetching
- Change `Kafka` field `userConfig.kafka.sasl_oauthbearer_sub_claim_name`: pattern ~~`^[^\r\n]*$`~~
`^[^\r\n]*\S[^\r\n]*$`
Expand Down
7 changes: 6 additions & 1 deletion api/v1alpha1/kafkaschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package v1alpha1

import (
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -17,9 +18,13 @@ type KafkaSchemaSpec struct {
// Kafka Schema configuration should be a valid Avro Schema JSON format
Schema string `json:"schema"`

// +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF
// Schema type
SchemaType kafkaschemaregistry.SchemaType `json:"schemaType,omitempty"`

// +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE
// Kafka Schemas compatibility level
CompatibilityLevel string `json:"compatibilityLevel,omitempty"`
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
}

// KafkaSchemaStatus defines the observed state of KafkaSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/aiven.io_kafkaschemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
90 changes: 62 additions & 28 deletions controllers/kafkaschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
"github.com/avast/retry-go"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,20 +46,32 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error {
schema, err := h.convert(obj)
if err != nil {
return err
}

// createOrUpdate Kafka Schema Subject
_, err = avn.KafkaSubjectSchemas.Add(
// Must poll kafka until the registry ready.
// The client retries errors under the hood.
_, err = avnGen.ServiceSchemaRegistrySubjectVersionsGet(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
aiven.KafkaSchemaSubject{
Schema: schema.Spec.Schema,
)
if err != nil && !isNotFound(err) {
return err
}

schemaID, err := avnGen.ServiceSchemaRegistrySubjectVersionPost(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{
Schema: schema.Spec.Schema,
SchemaType: schema.Spec.SchemaType,
},
)
if err != nil {
Expand All @@ -65,26 +80,49 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien

// set compatibility level if defined for a newly created Kafka Schema Subject
if schema.Spec.CompatibilityLevel != "" {
_, err := avn.KafkaSubjectSchemas.UpdateConfiguration(
_, err := avnGen.ServiceSchemaRegistrySubjectConfigPut(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
schema.Spec.CompatibilityLevel,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{
Compatibility: schema.Spec.CompatibilityLevel,
},
)
if err != nil {
return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err)
}
}

// get last version
version, err := h.getLastVersion(ctx, avn, schema)
// Gets the last version
// Because of eventual consistency, we must poll the subject
const (
pollDelay = 5 * time.Second
pollAttempts = 10
)

err = retry.Do(
func() error {
version, err := getKafkaSchemaVersion(
ctx,
avnGen,
schemaID,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
)
schema.Status.Version = version
return err
},
retry.Context(ctx),
retry.RetryIf(isNotFound),
retry.Delay(pollDelay),
retry.Attempts(pollAttempts),
)
if err != nil {
return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err)
}

schema.Status.Version = version

meta.SetStatusCondition(&schema.Status.Conditions,
getInitializedCondition("Added",
"Successfully created or updated the instance in Aiven"))
Expand All @@ -105,12 +143,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe
return false, err
}

err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
if err != nil && !isNotFound(err) {
return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err)
}

return true, nil
err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
return isDeleted(err)
}

func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
Expand Down Expand Up @@ -146,22 +180,22 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err
return schema, nil
}

func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) {
ver, err := avn.KafkaSubjectSchemas.GetVersions(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName)
func getKafkaSchemaVersion(ctx context.Context, client avngen.Client, schemaID int, projectName, serviceName, subjectName string) (int, error) {
versions, err := client.ServiceSchemaRegistrySubjectVersionsGet(ctx, projectName, serviceName, subjectName)
if err != nil {
return 0, err
}

var latestVersion int
for _, v := range ver.Versions {
if v > latestVersion {
latestVersion = v
for _, v := range versions {
version, err := client.ServiceSchemaRegistrySubjectVersionGet(ctx, projectName, serviceName, subjectName, v)
if err != nil {
return 0, err
}

if version.Id == schemaID {
return version.Version, nil
}
}

return latestVersion, nil
return 0, NewNotFound(fmt.Sprintf("the schema with id %d not found", schemaID))
}
1 change: 1 addition & 0 deletions docs/docs/api-reference/kafkaschema.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema.

- [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef).
- [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level.
- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type.

## authSecretRef {: #spec.authSecretRef }

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/aiven/aiven-go-client/v2 v2.26.0
github.com/aiven/go-api-schemas v1.85.0
github.com/aiven/go-client-codegen v0.31.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/dave/jennifer v1.7.1
github.com/docker/go-units v0.5.0
github.com/ghodss/yaml v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
5 changes: 4 additions & 1 deletion tests/kafkaschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -43,6 +44,7 @@ spec:
project: %[1]s
serviceName: %[2]s
subjectName: %[4]s
schemaType: AVRO
compatibilityLevel: BACKWARD
schema: |
{
Expand Down Expand Up @@ -104,7 +106,8 @@ func TestKafkaSchema(t *testing.T) {
assert.Equal(t, schemaName, schema.Name)
assert.Equal(t, subjectName, schema.Spec.SubjectName)
assert.Equal(t, kafkaName, schema.Spec.ServiceName)
assert.Equal(t, "BACKWARD", schema.Spec.CompatibilityLevel)
assert.Equal(t, kafkaschemaregistry.SchemaTypeAvro, schema.Spec.SchemaType)
assert.Equal(t, kafkaschemaregistry.CompatibilityTypeBackward, schema.Spec.CompatibilityLevel)

type schemaType struct {
Default any `json:"default,omitempty"`
Expand Down

0 comments on commit 7e2cc9b

Please sign in to comment.