From 176f225d29f347408ec8008184854076c7b85280 Mon Sep 17 00:00:00 2001 From: Aleksander Zaruczewski Date: Tue, 26 Sep 2023 14:53:58 -0700 Subject: [PATCH] feat(kafka): allow protobuf schema --- CHANGELOG.md | 1 + docs/data-sources/kafka_schema.md | 4 +- .../kafka_schema_configuration.md | 4 +- docs/resources/kafka_schema.md | 4 +- .../service/kafkaschema/kafka_schema.go | 71 ++++++++++++++-- .../service/kafkaschema/kafka_schema_test.go | 82 +++++++++++++++++++ 6 files changed, 151 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f7c9dfe3..4aac88024 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ nav_order: 1 ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD - Allow to modify `pg_user` replication settings +- Allow usage of `protobuf` schema in Kafka ## [4.9.0] - 2023-09-18 diff --git a/docs/data-sources/kafka_schema.md b/docs/data-sources/kafka_schema.md index 34d55ec1c..95863ee08 100644 --- a/docs/data-sources/kafka_schema.md +++ b/docs/data-sources/kafka_schema.md @@ -32,6 +32,6 @@ data "aiven_kafka_schema_configuration" "config" { - `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`. - `id` (String) The ID of this resource. -- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format. -- `schema_type` (String) Kafka Schema type JSON or AVRO +- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type. +- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF. - `version` (Number) Kafka Schema configuration version. diff --git a/docs/data-sources/kafka_schema_configuration.md b/docs/data-sources/kafka_schema_configuration.md index 43d4bdaea..4a95fad86 100644 --- a/docs/data-sources/kafka_schema_configuration.md +++ b/docs/data-sources/kafka_schema_configuration.md @@ -32,7 +32,7 @@ resource "aiven_kafka_schema_configuration" "config" { - `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`. - `id` (String) The ID of this resource. -- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format. -- `schema_type` (String) Kafka Schema type JSON or AVRO +- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type. +- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF. - `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource. - `version` (Number) Kafka Schema configuration version. diff --git a/docs/resources/kafka_schema.md b/docs/resources/kafka_schema.md index 5c15c8f45..aa2097477 100644 --- a/docs/resources/kafka_schema.md +++ b/docs/resources/kafka_schema.md @@ -43,14 +43,14 @@ resource "aiven_kafka_schema" "kafka-schema1" { ### Required - `project` (String) Identifies the project this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource. -- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format. +- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type. - `service_name` (String) Specifies the name of the service that this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource. - `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource. ### Optional - `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`. -- `schema_type` (String) Kafka Schema type JSON or AVRO +- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF. - `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts)) ### Read-Only diff --git a/internal/sdkprovider/service/kafkaschema/kafka_schema.go b/internal/sdkprovider/service/kafkaschema/kafka_schema.go index 956685868..76deaee08 100644 --- a/internal/sdkprovider/service/kafkaschema/kafka_schema.go +++ b/internal/sdkprovider/service/kafkaschema/kafka_schema.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "reflect" + "regexp" "github.com/aiven/aiven-go-client/v2" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" @@ -16,6 +17,29 @@ import ( "github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig" ) +// protobufHeaderRegExp is a regular expression that matches a valid Protobuf header. +var protobufHeaderRegExp = regexp.MustCompile("syntax(.*|)=(.*|)proto3(.*|);") + +// newlineRegExp is a regular expression that matches a newline. +var newlineRegExp = regexp.MustCompile(`\r?\n`) + +// validateStringHasProtobufHeader is a schema.SchemaValidateFunc that ensures a string contains a valid Protobuf +// header. +func validateStringHasProtobufHeader(i any, k string) (warnings []string, errors []error) { + v, ok := i.(string) + if !ok { + errors = append(errors, fmt.Errorf("expected type of %s to be string", k)) + + return warnings, errors + } + + if ok := protobufHeaderRegExp.Match([]byte(v)); !ok { + errors = append(errors, fmt.Errorf("%q contains an invalid Protobuf", k)) + } + + return warnings, errors +} + var aivenKafkaSchemaSchema = map[string]*schema.Schema{ "project": schemautil.CommonSchemaProjectReference, "service_name": schemautil.CommonSchemaServiceNameReference, @@ -28,18 +52,20 @@ var aivenKafkaSchemaSchema = map[string]*schema.Schema{ "schema": { Type: schema.TypeString, Required: true, - ValidateFunc: validation.StringIsJSON, - StateFunc: normalizeJSONString, - DiffSuppressFunc: diffSuppressJSONObject, - Description: "Kafka Schema configuration should be a valid Avro Schema JSON format.", + ValidateFunc: validation.Any(validation.StringIsJSON, validateStringHasProtobufHeader), + StateFunc: normalizeJSONOrProtobufString, + DiffSuppressFunc: diffSuppressJSONObjectOrProtobufString, + Description: "Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema," + + " depending on the schema type.", }, "schema_type": { - Type: schema.TypeString, - Optional: true, - ForceNew: true, - Description: "Kafka Schema type JSON or AVRO", + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: "Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, " + + "and PROTOBUF.", Default: "AVRO", - ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON"}, false), + ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON", "PROTOBUF"}, false), DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool { // This field can't be retrieved once resource is created. // That produces a diff on plan on resource import. @@ -79,6 +105,15 @@ func diffSuppressJSONObject(_, old, new string, _ *schema.ResourceData) bool { return reflect.DeepEqual(objNew, objOld) } +// diffSuppressJSONObjectOrProtobufString checks logical equivalences in JSON or Protobuf Kafka Schema values. +func diffSuppressJSONObjectOrProtobufString(k, old, new string, d *schema.ResourceData) bool { + if ok := protobufHeaderRegExp.Match([]byte(old)); ok { + return normalizeProtobufString(old) == normalizeProtobufString(new) + } + + return diffSuppressJSONObject(k, old, new, d) +} + // normalizeJSONString returns normalized JSON string func normalizeJSONString(v interface{}) string { jsonString, _ := structure.NormalizeJsonString(v) @@ -86,6 +121,24 @@ func normalizeJSONString(v interface{}) string { return jsonString } +// normalizeProtobufString returns normalized Protobuf string. +func normalizeProtobufString(i any) string { + v := i.(string) + + return newlineRegExp.ReplaceAllString(v, "") +} + +// normalizeJSONOrProtobufString returns normalized JSON or Protobuf string. +func normalizeJSONOrProtobufString(i any) string { + v := i.(string) + + if ok := protobufHeaderRegExp.Match([]byte(v)); ok { + return normalizeProtobufString(v) + } + + return normalizeJSONString(v) +} + func ResourceKafkaSchema() *schema.Resource { return &schema.Resource{ Description: "The Kafka Schema resource allows the creation and management of Aiven Kafka Schemas.", diff --git a/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go b/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go index 7d3725b2e..b7bc16c8e 100644 --- a/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go +++ b/internal/sdkprovider/service/kafkaschema/kafka_schema_test.go @@ -115,6 +115,32 @@ func TestAccAivenKafkaSchema_json_basic(t *testing.T) { }, }) } + +// TestAccAivenKafkaSchema_protobuf_basic is a test for Protobuf schema Kafka Schema resource. +func TestAccAivenKafkaSchema_protobuf_basic(t *testing.T) { + resourceName := "aiven_kafka_schema.foo" + rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.TestAccPreCheck(t) }, + ProtoV6ProviderFactories: acc.TestProtoV6ProviderFactories, + CheckDestroy: testAccCheckAivenKafkaSchemaResourceDestroy, + Steps: []resource.TestStep{ + { + Config: testAccKafkaSchemaProtobufResource(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema"), + resource.TestCheckResourceAttr(resourceName, "project", os.Getenv("AIVEN_PROJECT_NAME")), + resource.TestCheckResourceAttr(resourceName, "service_name", fmt.Sprintf("test-acc-sr-%s", rName)), + resource.TestCheckResourceAttr(resourceName, "subject_name", fmt.Sprintf("kafka-schema-%s", rName)), + resource.TestCheckResourceAttr(resourceName, "version", "1"), + resource.TestCheckResourceAttr(resourceName, "schema_type", "PROTOBUF"), + ), + }, + }, + }) +} + func TestAccAivenKafkaSchema_basic(t *testing.T) { resourceName := "aiven_kafka_schema.foo" rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum) @@ -269,6 +295,62 @@ data "aiven_kafka_schema" "schema" { }`, os.Getenv("AIVEN_PROJECT_NAME"), name, name) } +// testAccKafkaSchemaProtobufResource is a test resource for Protobuf schema Kafka Schema resource. +func testAccKafkaSchemaProtobufResource(name string) string { + return fmt.Sprintf(` +data "aiven_project" "foo" { + project = "%s" +} + +resource "aiven_kafka" "bar" { + project = data.aiven_project.foo.project + cloud_name = "google-europe-west1" + plan = "startup-2" + service_name = "test-acc-sr-%s" + maintenance_window_dow = "monday" + maintenance_window_time = "10:00:00" + + kafka_user_config { + schema_registry = true + + kafka { + group_max_session_timeout_ms = 70000 + log_retention_bytes = 1000000000 + } + } +} + +resource "aiven_kafka_schema_configuration" "foo" { + project = aiven_kafka.bar.project + service_name = aiven_kafka.bar.service_name + compatibility_level = "BACKWARD" +} + +resource "aiven_kafka_schema" "foo" { + project = aiven_kafka_schema_configuration.foo.project + service_name = aiven_kafka_schema_configuration.foo.service_name + subject_name = "kafka-schema-%s" + schema_type = "PROTOBUF" + + schema = <