From 60e27faa4a63523f4780e5298e7138ad6b0caf34 Mon Sep 17 00:00:00 2001 From: Aleksander Zaruczewski Date: Tue, 26 Sep 2023 14:53:58 -0700 Subject: [PATCH] feat(kafka): allow protobuf schema --- CHANGELOG.md | 1 + docs/data-sources/kafka_schema.md | 4 +- .../kafka_schema_configuration.md | 4 +- docs/resources/kafka_schema.md | 4 +- .../service/kafkaschema/kafka_schema.go | 51 ++++++++++++---- .../service/kafkaschema/kafka_schema_test.go | 61 ++++++++++++++++--- 6 files changed, 97 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc7682638..27d8b2086 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ nav_order: 1 - Allow to modify `pg_user` replication settings - Fix `aiven_project_user` 409 error handling +- Allow usage of `protobuf` schema in Kafka ## [4.9.0] - 2023-09-18 diff --git a/docs/data-sources/kafka_schema.md b/docs/data-sources/kafka_schema.md index 34d55ec1c..95863ee08 100644 --- a/docs/data-sources/kafka_schema.md +++ b/docs/data-sources/kafka_schema.md @@ -32,6 +32,6 @@ data "aiven_kafka_schema_configuration" "config" { - `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`. - `id` (String) The ID of this resource. -- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format. -- `schema_type` (String) Kafka Schema type JSON or AVRO +- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type. +- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF. - `version` (Number) Kafka Schema configuration version. diff --git a/docs/data-sources/kafka_schema_configuration.md b/docs/data-sources/kafka_schema_configuration.md index 43d4bdaea..4a95fad86 100644 --- a/docs/data-sources/kafka_schema_configuration.md +++ b/docs/data-sources/kafka_schema_configuration.md @@ -32,7 +32,7 @@ resource "aiven_kafka_schema_configuration" "config" { - `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`. - `id` (String) The ID of this resource. -- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format. -- `schema_type` (String) Kafka Schema type JSON or AVRO +- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type. +- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF. - `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource. - `version` (Number) Kafka Schema configuration version. diff --git a/docs/resources/kafka_schema.md b/docs/resources/kafka_schema.md index 5c15c8f45..aa2097477 100644 --- a/docs/resources/kafka_schema.md +++ b/docs/resources/kafka_schema.md @@ -43,14 +43,14 @@ resource "aiven_kafka_schema" "kafka-schema1" { ### Required - `project` (String) Identifies the project this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource. -- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format. +- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type. - `service_name` (String) Specifies the name of the service that this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource. - `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource. ### Optional - `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`. -- `schema_type` (String) Kafka Schema type JSON or AVRO +- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF. - `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts)) ### Read-Only diff --git a/internal/sdkprovider/service/kafkaschema/kafka_schema.go b/internal/sdkprovider/service/kafkaschema/kafka_schema.go index 956685868..b9738c55f 100644 --- a/internal/sdkprovider/service/kafkaschema/kafka_schema.go +++ b/internal/sdkprovider/service/kafkaschema/kafka_schema.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "reflect" + "regexp" "github.com/aiven/aiven-go-client/v2" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" @@ -16,6 +17,9 @@ import ( "github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig" ) +// newlineRegExp is a regular expression that matches a newline. +var newlineRegExp = regexp.MustCompile(`\r?\n`) + var aivenKafkaSchemaSchema = map[string]*schema.Schema{ "project": schemautil.CommonSchemaProjectReference, "service_name": schemautil.CommonSchemaServiceNameReference, @@ -28,18 +32,19 @@ var aivenKafkaSchemaSchema = map[string]*schema.Schema{ "schema": { Type: schema.TypeString, Required: true, - ValidateFunc: validation.StringIsJSON, - StateFunc: normalizeJSONString, - DiffSuppressFunc: diffSuppressJSONObject, - Description: "Kafka Schema configuration should be a valid Avro Schema JSON format.", + StateFunc: normalizeJSONOrProtobufString, + DiffSuppressFunc: diffSuppressJSONObjectOrProtobufString, + Description: "Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema," + + " depending on the schema type.", }, "schema_type": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - Description: "Kafka Schema type JSON or AVRO", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: "Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, " + + "and PROTOBUF.", Default: "AVRO", - ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON"}, false), + ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON", "PROTOBUF"}, false), DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool { // This field can't be retrieved once resource is created. // That produces a diff on plan on resource import. @@ -79,11 +84,31 @@ func diffSuppressJSONObject(_, old, new string, _ *schema.ResourceData) bool { return reflect.DeepEqual(objNew, objOld) } -// normalizeJSONString returns normalized JSON string -func normalizeJSONString(v interface{}) string { - jsonString, _ := structure.NormalizeJsonString(v) +// diffSuppressJSONObjectOrProtobufString checks logical equivalences in JSON or Protobuf Kafka Schema values. +func diffSuppressJSONObjectOrProtobufString(k, old, new string, d *schema.ResourceData) bool { + if !diffSuppressJSONObject(k, old, new, d) { + return normalizeProtobufString(old) == normalizeProtobufString(new) + } + + return false +} + +// normalizeProtobufString returns normalized Protobuf string. +func normalizeProtobufString(i any) string { + v := i.(string) + + return newlineRegExp.ReplaceAllString(v, "") +} + +// normalizeJSONOrProtobufString returns normalized JSON or Protobuf string. +func normalizeJSONOrProtobufString(i any) string { + v := i.(string) + + if n, err := structure.NormalizeJsonString(v); err == nil { + return n + } - return jsonString + return normalizeProtobufString(v) } func ResourceKafkaSchema() *schema.Resource { diff --git a/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go b/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go index 7d3725b2e..b0d850448 100644 --- a/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go +++ b/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go @@ -92,8 +92,11 @@ resource "aiven_kafka_schema" "schema" { `, project, serviceName, subjectName) } -func TestAccAivenKafkaSchema_json_basic(t *testing.T) { +// TestAccAivenKafkaSchema_json_protobuf_basic is a test for JSON and Protobuf schema Kafka Schema resource. +func TestAccAivenKafkaSchema_json_protobuf_basic(t *testing.T) { resourceName := "aiven_kafka_schema.foo" + resourceName2 := "aiven_kafka_schema.bar" + rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum) resource.ParallelTest(t, resource.TestCase{ @@ -102,19 +105,34 @@ func TestAccAivenKafkaSchema_json_basic(t *testing.T) { CheckDestroy: testAccCheckAivenKafkaSchemaResourceDestroy, Steps: []resource.TestStep{ { - Config: testAccKafkaSchemaJSONResource(rName), + Config: testAccKafkaSchemaJSONProtobufResource(rName), Check: resource.ComposeTestCheckFunc( testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema"), + testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema2"), resource.TestCheckResourceAttr(resourceName, "project", os.Getenv("AIVEN_PROJECT_NAME")), - resource.TestCheckResourceAttr(resourceName, "service_name", fmt.Sprintf("test-acc-sr-%s", rName)), - resource.TestCheckResourceAttr(resourceName, "subject_name", fmt.Sprintf("kafka-schema-%s", rName)), + resource.TestCheckResourceAttr( + resourceName, "service_name", fmt.Sprintf("test-acc-sr-%s", rName), + ), + resource.TestCheckResourceAttr( + resourceName, "subject_name", fmt.Sprintf("kafka-schema-%s-foo", rName), + ), resource.TestCheckResourceAttr(resourceName, "version", "1"), resource.TestCheckResourceAttr(resourceName, "schema_type", "JSON"), + resource.TestCheckResourceAttr(resourceName2, "project", os.Getenv("AIVEN_PROJECT_NAME")), + resource.TestCheckResourceAttr( + resourceName2, "service_name", fmt.Sprintf("test-acc-sr-%s", rName), + ), + resource.TestCheckResourceAttr( + resourceName2, "subject_name", fmt.Sprintf("kafka-schema-%s-bar", rName), + ), + resource.TestCheckResourceAttr(resourceName2, "version", "1"), + resource.TestCheckResourceAttr(resourceName2, "schema_type", "PROTOBUF"), ), }, }, }) } + func TestAccAivenKafkaSchema_basic(t *testing.T) { resourceName := "aiven_kafka_schema.foo" rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum) @@ -208,17 +226,18 @@ func testAccCheckAivenKafkaSchemaResourceDestroy(s *terraform.State) error { return nil } -func testAccKafkaSchemaJSONResource(name string) string { +// testAccKafkaSchemaJSONProtobufResource is a test resource for JSON and Protobuf schema Kafka Schema resource. +func testAccKafkaSchemaJSONProtobufResource(name string) string { return fmt.Sprintf(` data "aiven_project" "foo" { - project = "%s" + project = "%[1]s" } resource "aiven_kafka" "bar" { project = data.aiven_project.foo.project cloud_name = "google-europe-west1" plan = "startup-2" - service_name = "test-acc-sr-%s" + service_name = "test-acc-sr-%[2]s" maintenance_window_dow = "monday" maintenance_window_time = "10:00:00" @@ -241,7 +260,7 @@ resource "aiven_kafka_schema_configuration" "foo" { resource "aiven_kafka_schema" "foo" { project = aiven_kafka_schema_configuration.foo.project service_name = aiven_kafka_schema_configuration.foo.service_name - subject_name = "kafka-schema-%s" + subject_name = "kafka-schema-%[2]s-foo" schema_type = "JSON" schema = <