Skip to content

Commit

Permalink
fix(kafkaschema): poll 404
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Jul 25, 2024
1 parent 1111d64 commit 199ebfb
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Fix `KafkaTopic`: fails to create a topic with the replication factor set more than running Kafka nodes
- Fix `ServiceIntegration`: sends empty source and destination projects
- Add `KafkaSchema` field `schemaType`, type `string`: Schema type

## v0.24.0 - 2024-07-16

Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/kafkaschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type KafkaSchemaSpec struct {
// Kafka Schema configuration should be a valid Avro Schema JSON format
Schema string `json:"schema"`

// +kubebuilder:validation:Enum=AVRO;JSON;PROTOBUF
// Schema type
SchemaType string `json:"schemaType,omitempty"`

// +kubebuilder:validation:Enum=BACKWARD;BACKWARD_TRANSITIVE;FORWARD;FORWARD_TRANSITIVE;FULL;FULL_TRANSITIVE;NONE
// Kafka Schemas compatibility level
CompatibilityLevel string `json:"compatibilityLevel,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
7 changes: 7 additions & 0 deletions config/crd/bases/aiven.io_kafkaschemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ spec:
Kafka Schema configuration should be a valid Avro Schema
JSON format
type: string
schemaType:
description: Schema type
enum:
- AVRO
- JSON
- PROTOBUF
type: string
serviceName:
description:
Specifies the name of the service that this resource
Expand Down
26 changes: 26 additions & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ func serviceIsRunning[T service.ServiceStateType | string](state T) bool {
return service.ServiceStateType(state) == service.ServiceStateTypeRunning
}

const allNodesRunning = -1

func minNodesRunning(ctx context.Context, avnGen avngen.Client, project, serviceName string, nodesRunning int) (bool, error) {
s, err := avnGen.ServiceGet(ctx, project, serviceName)
if isNotFound(err) {
return false, nil
}

if err != nil {
return false, err
}

running := 0
for _, node := range s.NodeStates {
if node.State == service.NodeStateTypeRunning {
running++
}
}

if nodesRunning == allNodesRunning {
nodesRunning = len(s.NodeStates)
}

return running >= min(len(s.NodeStates), nodesRunning), nil
}

func getInitializedCondition(reason, message string) metav1.Condition {
return metav1.Condition{
Type: conditionTypeInitialized,
Expand Down
72 changes: 34 additions & 38 deletions controllers/kafkaschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/kafkaschemaregistry"
"github.com/avast/retry-go"
"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -43,20 +47,20 @@ func (r *KafkaSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error {
schema, err := h.convert(obj)
if err != nil {
return err
}

// createOrUpdate Kafka Schema Subject
_, err = avn.KafkaSubjectSchemas.Add(
_, err = avnGen.ServiceSchemaRegistrySubjectVersionPost(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
aiven.KafkaSchemaSubject{
Schema: schema.Spec.Schema,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectVersionPostIn{
Schema: schema.Spec.Schema,
SchemaType: kafkaschemaregistry.SchemaType(schema.Spec.SchemaType),
},
)
if err != nil {
Expand All @@ -65,26 +69,42 @@ func (h KafkaSchemaHandler) createOrUpdate(ctx context.Context, avn *aiven.Clien

// set compatibility level if defined for a newly created Kafka Schema Subject
if schema.Spec.CompatibilityLevel != "" {
_, err := avn.KafkaSubjectSchemas.UpdateConfiguration(
_, err := avnGen.ServiceSchemaRegistrySubjectConfigPut(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName,
schema.Spec.CompatibilityLevel,
&kafkaschemaregistry.ServiceSchemaRegistrySubjectConfigPutIn{
Compatibility: kafkaschemaregistry.CompatibilityType(schema.Spec.CompatibilityLevel),
},
)
if err != nil {
return fmt.Errorf("cannot update Kafka Schema Configuration: %w", err)
}
}

// get last version
version, err := h.getLastVersion(ctx, avn, schema)
// Gets the last version
const (
pollDelay = 5 * time.Second
pollAttempts = 10
)
err = retry.Do(
func() error {
versions, err := avnGen.ServiceSchemaRegistrySubjectVersionsGet(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
if err == nil && len(versions) > 0 {
schema.Status.Version = slices.Max(versions)
}
return err
},
retry.Context(ctx),
retry.RetryIf(isNotFound),
retry.Delay(pollDelay),
retry.Attempts(pollAttempts),
)
if err != nil {
return fmt.Errorf("cannot get Kafka Schema Subject version: %w", err)
}

schema.Status.Version = version

meta.SetStatusCondition(&schema.Status.Conditions,
getInitializedCondition("Added",
"Successfully created or updated the instance in Aiven"))
Expand All @@ -105,12 +125,8 @@ func (h KafkaSchemaHandler) delete(ctx context.Context, avn *aiven.Client, avnGe
return false, err
}

err = avn.KafkaSubjectSchemas.Delete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
if err != nil && !isNotFound(err) {
return false, fmt.Errorf("aiven client delete Kafka Schema error: %w", err)
}

return true, nil
err = avnGen.ServiceSchemaRegistrySubjectDelete(ctx, schema.Spec.Project, schema.Spec.ServiceName, schema.Spec.SubjectName)
return isDeleted(err)
}

func (h KafkaSchemaHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
Expand All @@ -134,7 +150,7 @@ func (h KafkaSchemaHandler) checkPreconditions(ctx context.Context, avn *aiven.C
return false, err
}

return checkServiceIsRunning(ctx, avn, avnGen, schema.Spec.Project, schema.Spec.ServiceName)
return minNodesRunning(ctx, avnGen, schema.Spec.Project, schema.Spec.ServiceName, allNodesRunning)
}

func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, error) {
Expand All @@ -145,23 +161,3 @@ func (h KafkaSchemaHandler) convert(i client.Object) (*v1alpha1.KafkaSchema, err

return schema, nil
}

func (h KafkaSchemaHandler) getLastVersion(ctx context.Context, avn *aiven.Client, schema *v1alpha1.KafkaSchema) (int, error) {
ver, err := avn.KafkaSubjectSchemas.GetVersions(
ctx,
schema.Spec.Project,
schema.Spec.ServiceName,
schema.Spec.SubjectName)
if err != nil {
return 0, err
}

var latestVersion int
for _, v := range ver.Versions {
if v > latestVersion {
latestVersion = v
}
}

return latestVersion, nil
}
18 changes: 1 addition & 17 deletions controllers/kafkatopic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/aiven/aiven-go-client/v2"
avngen "github.com/aiven/go-client-codegen"
"github.com/aiven/go-client-codegen/handler/service"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -175,24 +174,9 @@ func (h KafkaTopicHandler) checkPreconditions(ctx context.Context, avn *aiven.Cl
meta.SetStatusCondition(&topic.Status.Conditions,
getInitializedCondition("Preconditions", "Checking preconditions"))

s, err := avnGen.ServiceGet(ctx, topic.Spec.Project, topic.Spec.ServiceName)
if isNotFound(err) {
return false, nil
}

if err != nil {
return false, err
}

running := 0
for _, node := range s.NodeStates {
if node.State == service.NodeStateTypeRunning {
running++
}
}
// Replication factor requires enough nodes running.
// But we want to get the backend validation error if the value is too high
return running >= min(len(s.NodeStates), topic.Spec.Replication), nil
return minNodesRunning(ctx, avnGen, topic.Spec.Project, topic.Spec.ServiceName, topic.Spec.Replication)
}

func (h KafkaTopicHandler) getState(ctx context.Context, avn *aiven.Client, topic *v1alpha1.KafkaTopic) (string, error) {
Expand Down
1 change: 1 addition & 0 deletions docs/docs/api-reference/kafkaschema.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ KafkaSchemaSpec defines the desired state of KafkaSchema.

- [`authSecretRef`](#spec.authSecretRef-property){: name='spec.authSecretRef-property'} (object). Authentication reference to Aiven token in a secret. See below for [nested schema](#spec.authSecretRef).
- [`compatibilityLevel`](#spec.compatibilityLevel-property){: name='spec.compatibilityLevel-property'} (string, Enum: `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE`, `NONE`). Kafka Schemas compatibility level.
- [`schemaType`](#spec.schemaType-property){: name='spec.schemaType-property'} (string, Enum: `AVRO`, `JSON`, `PROTOBUF`). Schema type.

## authSecretRef {: #spec.authSecretRef }

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ require (
require (
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down

0 comments on commit 199ebfb

Please sign in to comment.