From 133e58c032393c095a0190d799cc4a970c251873 Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Wed, 13 Dec 2023 13:26:54 +0100 Subject: [PATCH] feat(kafka): expose schema registry and rest uris (#563) --- CHANGELOG.md | 1 + controllers/kafka_controller.go | 16 +++++++++------- tests/kafka_test.go | 2 ++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44511a7b..ded6248b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD - Check VPC for running services before deletion +- Expose `KAFKA_SCHEMA_REGISTRY_URI` and `KAFKA_REST_URI` to `Kafka` secret ## v0.16.0 - 2023-12-07 diff --git a/controllers/kafka_controller.go b/controllers/kafka_controller.go index 07f4dad2..497a3b02 100644 --- a/controllers/kafka_controller.go +++ b/controllers/kafka_controller.go @@ -80,13 +80,15 @@ func (a *kafkaAdapter) newSecret(ctx context.Context, s *aiven.Service) (*corev1 prefix := getSecretPrefix(a) stringData := map[string]string{ - prefix + "HOST": s.URIParams["host"], - prefix + "PORT": s.URIParams["port"], - prefix + "PASSWORD": password, - prefix + "USERNAME": userName, - prefix + "ACCESS_CERT": s.ConnectionInfo.KafkaAccessCert, - prefix + "ACCESS_KEY": s.ConnectionInfo.KafkaAccessKey, - prefix + "CA_CERT": caCert, + prefix + "HOST": s.URIParams["host"], + prefix + "PORT": s.URIParams["port"], + prefix + "PASSWORD": password, + prefix + "USERNAME": userName, + prefix + "ACCESS_CERT": s.ConnectionInfo.KafkaAccessCert, + prefix + "ACCESS_KEY": s.ConnectionInfo.KafkaAccessKey, + prefix + "REST_URI": s.ConnectionInfo.KafkaRestURI, + prefix + "SCHEMA_REGISTRY_URI": s.ConnectionInfo.SchemaRegistryURI, + prefix + "CA_CERT": caCert, // todo: remove in future releases "HOST": s.URIParams["host"], "PORT": s.URIParams["port"], diff --git a/tests/kafka_test.go b/tests/kafka_test.go index 6ba75e2f..05787b64 100644 --- a/tests/kafka_test.go +++ b/tests/kafka_test.go @@ -129,6 +129,7 @@ func TestKafka(t *testing.T) { // Schema registry test assert.Equal(t, anyPointer(true), ks.Spec.UserConfig.SchemaRegistry) + assert.NotEmpty(t, secret.Data["KAFKA_SCHEMA_REGISTRY_URI"]) assert.NotEmpty(t, secret.Data["KAFKA_SCHEMA_REGISTRY_HOST"]) assert.NotEmpty(t, secret.Data["KAFKA_SCHEMA_REGISTRY_PORT"]) assert.NotEqual(t, secret.Data["KAFKA_SCHEMA_REGISTRY_PORT"], secret.Data["KAFKA_PORT"]) @@ -143,6 +144,7 @@ func TestKafka(t *testing.T) { // Kafka REST test assert.Equal(t, anyPointer(true), ks.Spec.UserConfig.KafkaRest) + assert.NotEmpty(t, secret.Data["KAFKA_REST_URI"]) assert.NotEmpty(t, secret.Data["KAFKA_REST_HOST"]) assert.NotEmpty(t, secret.Data["KAFKA_REST_PORT"]) assert.NotEqual(t, secret.Data["KAFKA_REST_PORT"], secret.Data["KAFKA_PORT"])