Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafkaschema): poll 404 #797

Merged
merged 1 commit into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes
- Fix `ServiceIntegration`: sends empty source and destination projects
- Fix `KafkaSchema`: poll resource availability
- Add `KafkaSchema` field `schemaType`, type `string`: Schema type
- Add `Kafka` field `userConfig.follower_fetching`, type `object`: Enable follower fetching
- Change `Kafka` field `userConfig.kafka.sasl_oauthbearer_sub_claim_name`: pattern ~~`^[^\r\n]*$`~~ →
`^[^\r\n]*\S[^\r\n]*$`
Expand Down
7 changes: 6 additions & 1 deletion api/v1alpha1/kafkaschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package v1alpha1

import (
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -17,9 +18,13 @@ type KafkaSchemaSpec struct {
// Kafka Schema configuration should be a valid Avro Schema JSON format
Schema string `json:"schema"`

// +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF
// Schema type
SchemaType kafkaschemaregistry.SchemaType `json:"schemaType,omitempty"`

// +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE
// Kafka Schemas compatibility level
CompatibilityLevel string `json:"compatibilityLevel,omitempty"`
CompatibilityLevel kafkaschemaregistry.CompatibilityType `json:"compatibilityLevel,omitempty"`
}

// KafkaSchemaStatus defines the observed state of KafkaSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/aiven.io_kafkaschemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
90 changes: 62 additions & 28 deletions controllers/kafkaschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
"github.com/avast/retry-go"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,20 +46,32 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error {
schema, err := h.convert(obj)
if err != nil {
return err
}

// createOrUpdate Kafka Schema Subject
_, err = avn.KafkaSubjectSchemas.Add(
// Must poll kafka until the registry ready.
// The client retries errors under the hood.
_, err = avnGen.ServiceSchemaRegistrySubjectVersionsGet(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
aiven.KafkaSchemaSubject{
Schema: schema.Spec.Schema,
)
if err != nil && !isNotFound(err) {
return err
}

schemaID, err := avnGen.ServiceSchemaRegistrySubjectVersionPost(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{
Schema: schema.Spec.Schema,
SchemaType: schema.Spec.SchemaType,
},
)
if err != nil {
Expand All @@ -65,26 +80,49 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien

// set compatibility level if defined for a newly created Kafka Schema Subject
if schema.Spec.CompatibilityLevel != "" {
_, err := avn.KafkaSubjectSchemas.UpdateConfiguration(
_, err := avnGen.ServiceSchemaRegistrySubjectConfigPut(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
schema.Spec.CompatibilityLevel,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{
Compatibility: schema.Spec.CompatibilityLevel,
},
)
if err != nil {
return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err)
}
}

// get last version
version, err := h.getLastVersion(ctx, avn, schema)
// Gets the last version
// Because of eventual consistency, we must poll the subject
const (
pollDelay = 5 * time.Second
pollAttempts = 10
)

err = retry.Do(
func() error {
version, err := getKafkaSchemaVersion(
ctx,
avnGen,
schemaID,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
)
schema.Status.Version = version
return err
},
retry.Context(ctx),
retry.RetryIf(isNotFound),
retry.Delay(pollDelay),
retry.Attempts(pollAttempts),
)
if err != nil {
return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err)
}

schema.Status.Version = version

meta.SetStatusCondition(&schema.Status.Conditions,
getInitializedCondition("Added",
"Successfully created or updated the instance in Aiven"))
Expand All @@ -105,12 +143,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe
return false, err
}

err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
if err != nil && !isNotFound(err) {
return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err)
}

return true, nil
err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
return isDeleted(err)
}

func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
Expand Down Expand Up @@ -146,22 +180,22 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err
return schema, nil
}

func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) {
ver, err := avn.KafkaSubjectSchemas.GetVersions(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName)
func getKafkaSchemaVersion(ctx context.Context, client avngen.Client, schemaID int, projectName, serviceName, subjectName string) (int, error) {
versions, err := client.ServiceSchemaRegistrySubjectVersionsGet(ctx, projectName, serviceName, subjectName)
if err != nil {
return 0, err
}

var latestVersion int
for _, v := range ver.Versions {
if v > latestVersion {
latestVersion = v
for _, v := range versions {
version, err := client.ServiceSchemaRegistrySubjectVersionGet(ctx, projectName, serviceName, subjectName, v)
if err != nil {
return 0, err
}

if version.Id == schemaID {
return version.Version, nil
}
}

return latestVersion, nil
return 0, NewNotFound(fmt.Sprintf("the schema with id %d not found", schemaID))
}
1 change: 1 addition & 0 deletions docs/docs/api-reference/kafkaschema.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema.

- [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef).
- [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level.
- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type.

## authSecretRef {: #spec.authSecretRef }

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/aiven/aiven-go-client/v2 v2.26.0
github.com/aiven/go-api-schemas v1.85.0
github.com/aiven/go-client-codegen v0.31.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/dave/jennifer v1.7.1
github.com/docker/go-units v0.5.0
github.com/ghodss/yaml v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
5 changes: 4 additions & 1 deletion tests/kafkaschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -43,6 +44,7 @@ spec:
project: %[1]s
serviceName: %[2]s
subjectName: %[4]s
schemaType: AVRO
compatibilityLevel: BACKWARD
schema: |
{
Expand Down Expand Up @@ -104,7 +106,8 @@ func TestKafkaSchema(t *testing.T) {
assert.Equal(t, schemaName, schema.Name)
assert.Equal(t, subjectName, schema.Spec.SubjectName)
assert.Equal(t, kafkaName, schema.Spec.ServiceName)
assert.Equal(t, "BACKWARD", schema.Spec.CompatibilityLevel)
assert.Equal(t, kafkaschemaregistry.SchemaTypeAvro, schema.Spec.SchemaType)
assert.Equal(t, kafkaschemaregistry.CompatibilityTypeBackward, schema.Spec.CompatibilityLevel)

type schemaType struct {
Default any `json:"default,omitempty"`
Expand Down
Loading