Skip to content

Commit

Permalink
fix(kafkaschema): poll schema version
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Jul 26, 2024
1 parent fe9fc2b commit 4a886f5
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes
- Fix `ServiceIntegration`: sends empty source and destination projects
- Add `KafkaSchema` field `schemaType`, type `string`: Schema type

## v0.24.0 - 2024-07-16

Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/kafkaschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type KafkaSchemaSpec struct {
// Kafka Schema configuration should be a valid Avro Schema JSON format
Schema string `json:"schema"`

// +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF
// Schema type
SchemaType string `json:"schemaType,omitempty"`

// +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE
// Kafka Schemas compatibility level
CompatibilityLevel string `json:"compatibilityLevel,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/aiven.io_kafkaschemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
26 changes: 26 additions & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ func serviceIsRunning[T service.ServiceStateType | string](state T) bool {
return service.ServiceStateType(state) == service.ServiceStateTypeRunning
}

const allNodesRunning = -1

func minNodesRunning(ctx context.Context, avnGen avngen.Client, project, serviceName string, nodesRunning int) (bool, error) {
s, err := avnGen.ServiceGet(ctx, project, serviceName)
if isNotFound(err) {
return false, nil
}

if err != nil {
return false, err
}

running := 0
for _, node := range s.NodeStates {
if node.State == service.NodeStateTypeRunning {
running++
}
}

if nodesRunning == allNodesRunning {
nodesRunning = len(s.NodeStates)
}

return running >= min(len(s.NodeStates), nodesRunning), nil
}

func getInitializedCondition(reason, message string) metav1.Condition {
return metav1.Condition{
Type: conditionTypeInitialized,
Expand Down
83 changes: 46 additions & 37 deletions controllers/kafkaschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
"github.com/avast/retry-go"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,20 +46,20 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error {
schema, err := h.convert(obj)
if err != nil {
return err
}

// createOrUpdate Kafka Schema Subject
_, err = avn.KafkaSubjectSchemas.Add(
schemaID, err := avnGen.ServiceSchemaRegistrySubjectVersionPost(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
aiven.KafkaSchemaSubject{
Schema: schema.Spec.Schema,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{
Schema: schema.Spec.Schema,
SchemaType: kafkaschemaregistry.SchemaType(schema.Spec.SchemaType),
},
)
if err != nil {
Expand All @@ -65,26 +68,56 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien

// set compatibility level if defined for a newly created Kafka Schema Subject
if schema.Spec.CompatibilityLevel != "" {
_, err := avn.KafkaSubjectSchemas.UpdateConfiguration(
_, err := avnGen.ServiceSchemaRegistrySubjectConfigPut(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
schema.Spec.CompatibilityLevel,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{
Compatibility: kafkaschemaregistry.CompatibilityType(schema.Spec.CompatibilityLevel),
},
)
if err != nil {
return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err)
}
}

// get last version
version, err := h.getLastVersion(ctx, avn, schema)
// Gets the last version
// Because of eventual consistency, we must poll the subject
const (
pollDelay = 5 * time.Second
pollAttempts = 10
)
err = retry.Do(
func() error {
versions, err := avnGen.ServiceSchemaRegistrySubjectVersionsGet(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
if err != nil {
return err
}

for _, v := range versions {
version, err := avnGen.ServiceSchemaRegistrySubjectVersionGet(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName, v)
if err != nil {
return err
}

if version.Id == schemaID {
schema.Status.Version = version.Version
return nil
}
}
return NewNotFound(fmt.Sprintf("the schema with id %d not found", schemaID))
},
retry.Context(ctx),
retry.RetryIf(isNotFound),
retry.Delay(pollDelay),
retry.Attempts(pollAttempts),
)

Check failure on line 116 in controllers/kafkaschema_controller.go

View workflow job for this annotation

GitHub Actions / Trunk Check

golangci-lint(gofumpt)

[new] File is not `gofumpt`-ed
if err != nil {
return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err)
}

schema.Status.Version = version

meta.SetStatusCondition(&schema.Status.Conditions,
getInitializedCondition("Added",
"Successfully created or updated the instance in Aiven"))
Expand All @@ -105,12 +138,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe
return false, err
}

err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
if err != nil && !isNotFound(err) {
return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err)
}

return true, nil
err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
return isDeleted(err)
}

func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
Expand Down Expand Up @@ -145,23 +174,3 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err

return schema, nil
}

func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) {
ver, err := avn.KafkaSubjectSchemas.GetVersions(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName)
if err != nil {
return 0, err
}

var latestVersion int
for _, v := range ver.Versions {
if v > latestVersion {
latestVersion = v
}
}

return latestVersion, nil
}
18 changes: 1 addition & 17 deletions controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -175,24 +174,9 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl
meta.SetStatusCondition(&topic.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

s, err := avnGen.ServiceGet(ctx, topic.Spec.Project, topic.Spec.ServiceName)
if isNotFound(err) {
return false, nil
}

if err != nil {
return false, err
}

running := 0
for _, node := range s.NodeStates {
if node.State == service.NodeStateTypeRunning {
running++
}
}
// Replication factor requires enough nodes running.
// But we want to get the backend validation error if the value is too high
return running >= min(len(s.NodeStates), topic.Spec.Replication), nil
return minNodesRunning(ctx, avnGen, topic.Spec.Project, topic.Spec.ServiceName, topic.Spec.Replication)
}

func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) {
Expand Down
1 change: 1 addition & 0 deletions docs/docs/api-reference/kafkaschema.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema.

- [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef).
- [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level.
- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type.

## authSecretRef {: #spec.authSecretRef }

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.26.0
github.com/aiven/aiven-go-client/v2 v2.24.0
github.com/aiven/go-api-schemas v1.79.0
github.com/aiven/go-client-codegen v0.18.0
github.com/aiven/go-client-codegen v0.18.1-0.20240725162756-dcc0909158b6
github.com/avast/retry-go v3.0.0+incompatible
github.com/dave/jennifer v1.7.0
github.com/docker/go-units v0.5.0
github.com/ghodss/yaml v1.0.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ github.com/aiven/aiven-go-client/v2 v2.24.0 h1:Ce9xXuTqs8EsjD0Vj67qhMnnYV/9UXzFL
github.com/aiven/aiven-go-client/v2 v2.24.0/go.mod h1:KdHfLIlIRZIfCSEBd39j1Q81jlSb6Nd+oCQKqERfnuA=
github.com/aiven/go-api-schemas v1.79.0 h1:V6H7XKbsgfwWWLBazj53ZiQygCZdMB9os2ZP5wvgzIw=
github.com/aiven/go-api-schemas v1.79.0/go.mod h1:FYR22WcKLisZ1CYqyyGk6XqNSyxfAUtaQd+P2ydwc5A=
github.com/aiven/go-client-codegen v0.18.0 h1:Bpt/++DojYTFsgz0iMuYsQSa0TOQkZ8JSQBQLPvUQus=
github.com/aiven/go-client-codegen v0.18.0/go.mod h1:Sajbdpjn1/m5g2D6EDfiSnxl9pj9hxe8+hpG1CkCkhs=
github.com/aiven/go-client-codegen v0.18.1-0.20240725162756-dcc0909158b6 h1:3I3iTg6aNp5JDLxmWj7ZLXI2McLxYpreK103Z7+ymbM=
github.com/aiven/go-client-codegen v0.18.1-0.20240725162756-dcc0909158b6/go.mod h1:Sajbdpjn1/m5g2D6EDfiSnxl9pj9hxe8+hpG1CkCkhs=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
13 changes: 11 additions & 2 deletions tests/kafkaschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ func TestKafkaSchema(t *testing.T) {
s := NewSession(ctx, k8sClient, cfg.Project)

// Cleans test afterward
defer s.Destroy(t)
destroy := true
defer func() {
if destroy {
s.Destroy(t)
}
}()

// WHEN
// Applies given manifest
Expand All @@ -100,7 +105,11 @@ func TestKafkaSchema(t *testing.T) {

// KafkaSchema test
schema := new(v1alpha1.KafkaSchema)
require.NoError(t, s.GetRunning(schema, schemaName))
err = s.GetRunning(schema, schemaName)
if err != nil {
destroy = false
}
require.NoError(t, err)
assert.Equal(t, schemaName, schema.Name)
assert.Equal(t, subjectName, schema.Spec.SubjectName)
assert.Equal(t, kafkaName, schema.Spec.ServiceName)
Expand Down

0 comments on commit 4a886f5

Please sign in to comment.