From 7466e79aa5cdf02a9896d09394da0d566d04f724 Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Wed, 10 Jan 2024 12:20:33 +0100 Subject: [PATCH] fix(kafka_topic): config optional fields --- CHANGELOG.md | 2 + go.mod | 2 +- go.sum | 4 +- .../service/kafkatopic/kafka_topic.go | 408 +++++++++--------- .../service/kafkatopic/kafka_topic_test.go | 119 +++++ 5 files changed, 333 insertions(+), 202 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97c8a1304..8e5d15f74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ nav_order: 1 - Add organization application users support - Configure "insufficient broker" error retries timeout +- Enable `local_retention_*` fields in `aiven_kafka_topic` resource +- Validate `local_retention_bytes` is not bigger than `retention_bytes` ## [4.12.1] - 2024-01-05 diff --git a/go.mod b/go.mod index 5721ede1d..3eac32e27 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/aiven/terraform-provider-aiven go 1.21 require ( - github.com/aiven/aiven-go-client/v2 v2.9.0 + github.com/aiven/aiven-go-client/v2 v2.9.1-0.20240110103049-b4535919524f github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.0 github.com/docker/go-units v0.5.0 diff --git a/go.sum b/go.sum index d67bb9fe9..e39f37525 100644 --- a/go.sum +++ b/go.sum @@ -197,8 +197,8 @@ github.com/ProtonMail/go-crypto v0.0.0-20230828082145-3c4c8a2d2371/go.mod h1:EjA github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo= github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= -github.com/aiven/aiven-go-client/v2 v2.9.0 h1:LpfilLbahoCzDglh77jbwD5L8sJI3p7BvuZ/NXZoA8w= -github.com/aiven/aiven-go-client/v2 v2.9.0/go.mod h1:x0xhzxWEKAwKv0xY5FvECiI6tesWshcPHvjwl0B/1SU= +github.com/aiven/aiven-go-client/v2 v2.9.1-0.20240110103049-b4535919524f h1:UaxrO2fczUHb5rN7nfIWH+Syv9gj58Rc3QRt7Fjy/NQ= +github.com/aiven/aiven-go-client/v2 v2.9.1-0.20240110103049-b4535919524f/go.mod h1:x0xhzxWEKAwKv0xY5FvECiI6tesWshcPHvjwl0B/1SU= github.com/aiven/go-api-schemas v1.52.0 h1:kCv4kNHGX2EvyKjoNWTfr9vIY6K5nNBn+A8h0bLWAjQ= github.com/aiven/go-api-schemas v1.52.0/go.mod h1:/bPxBUHza/2Aeer6hIIdB++GxKiw9K1KCBtRa2rtZ5I= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 024399255..aff0a7364 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -2,7 +2,9 @@ package kafkatopic import ( "context" + "encoding/json" "errors" + "fmt" "log" "github.com/aiven/aiven-go-client/v2" @@ -16,6 +18,174 @@ import ( "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/kafkatopicrepository" ) +var errLocalRetentionBytes = fmt.Errorf("local_retention_bytes must not be more than retention_bytes value") + +var aivenKafkaTopicConfigSchema = map[string]*schema.Schema{ + "cleanup_policy": { + Type: schema.TypeString, + Description: "cleanup.policy value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "compression_type": { + Type: schema.TypeString, + Description: "compression.type value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "delete_retention_ms": { + Type: schema.TypeString, + Description: "delete.retention.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "file_delete_delay_ms": { + Type: schema.TypeString, + Description: "file.delete.delay.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "flush_messages": { + Type: schema.TypeString, + Description: "flush.messages value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "flush_ms": { + Type: schema.TypeString, + Description: "flush.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "index_interval_bytes": { + Type: schema.TypeString, + Description: "index.interval.bytes value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "max_compaction_lag_ms": { + Type: schema.TypeString, + Description: "max.compaction.lag.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "max_message_bytes": { + Type: schema.TypeString, + Description: "max.message.bytes value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "message_downconversion_enable": { + Type: schema.TypeBool, + Description: "message.downconversion.enable value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "message_format_version": { + Type: schema.TypeString, + Description: "message.format.version value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "message_timestamp_difference_max_ms": { + Type: schema.TypeString, + Description: "message.timestamp.difference.max.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "message_timestamp_type": { + Type: schema.TypeString, + Description: "message.timestamp.type value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "min_cleanable_dirty_ratio": { + Type: schema.TypeFloat, + Description: "min.cleanable.dirty.ratio value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "min_compaction_lag_ms": { + Type: schema.TypeString, + Description: "min.compaction.lag.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "min_insync_replicas": { + Type: schema.TypeString, + Description: "min.insync.replicas value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "preallocate": { + Type: schema.TypeBool, + Description: "preallocate value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "retention_bytes": { + Type: schema.TypeString, + Description: "retention.bytes value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "retention_ms": { + Type: schema.TypeString, + Description: "retention.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "segment_bytes": { + Type: schema.TypeString, + Description: "segment.bytes value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "segment_index_bytes": { + Type: schema.TypeString, + Description: "segment.index.bytes value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "segment_jitter_ms": { + Type: schema.TypeString, + Description: "segment.jitter.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "segment_ms": { + Type: schema.TypeString, + Description: "segment.ms value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "unclean_leader_election_enable": { + Type: schema.TypeBool, + Description: "unclean.leader.election.enable value; This field is deprecated and no longer functional.", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + Deprecated: "This field is deprecated and no longer functional.", + }, + "remote_storage_enable": { + Type: schema.TypeBool, + Description: "remote.storage.enable value", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "local_retention_bytes": { + Type: schema.TypeString, + Description: "local.retention.bytes value. This field is temporarily disabled.", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, + "local_retention_ms": { + Type: schema.TypeString, + Description: "local.retention.ms value. This field is temporarily disabled.", + Optional: true, + DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, + }, +} + var aivenKafkaTopicSchema = map[string]*schema.Schema{ "project": schemautil.CommonSchemaProjectReference, "service_name": schemautil.CommonSchemaServiceNameReference, @@ -70,171 +240,7 @@ var aivenKafkaTopicSchema = map[string]*schema.Schema{ MaxItems: 1, DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "cleanup_policy": { - Type: schema.TypeString, - Description: "cleanup.policy value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "compression_type": { - Type: schema.TypeString, - Description: "compression.type value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "delete_retention_ms": { - Type: schema.TypeString, - Description: "delete.retention.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "file_delete_delay_ms": { - Type: schema.TypeString, - Description: "file.delete.delay.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "flush_messages": { - Type: schema.TypeString, - Description: "flush.messages value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "flush_ms": { - Type: schema.TypeString, - Description: "flush.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "index_interval_bytes": { - Type: schema.TypeString, - Description: "index.interval.bytes value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "max_compaction_lag_ms": { - Type: schema.TypeString, - Description: "max.compaction.lag.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "max_message_bytes": { - Type: schema.TypeString, - Description: "max.message.bytes value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "message_downconversion_enable": { - Type: schema.TypeBool, - Description: "message.downconversion.enable value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "message_format_version": { - Type: schema.TypeString, - Description: "message.format.version value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "message_timestamp_difference_max_ms": { - Type: schema.TypeString, - Description: "message.timestamp.difference.max.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "message_timestamp_type": { - Type: schema.TypeString, - Description: "message.timestamp.type value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "min_cleanable_dirty_ratio": { - Type: schema.TypeFloat, - Description: "min.cleanable.dirty.ratio value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "min_compaction_lag_ms": { - Type: schema.TypeString, - Description: "min.compaction.lag.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "min_insync_replicas": { - Type: schema.TypeString, - Description: "min.insync.replicas value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "preallocate": { - Type: schema.TypeBool, - Description: "preallocate value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "retention_bytes": { - Type: schema.TypeString, - Description: "retention.bytes value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "retention_ms": { - Type: schema.TypeString, - Description: "retention.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "segment_bytes": { - Type: schema.TypeString, - Description: "segment.bytes value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "segment_index_bytes": { - Type: schema.TypeString, - Description: "segment.index.bytes value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "segment_jitter_ms": { - Type: schema.TypeString, - Description: "segment.jitter.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "segment_ms": { - Type: schema.TypeString, - Description: "segment.ms value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "unclean_leader_election_enable": { - Type: schema.TypeBool, - Description: "unclean.leader.election.enable value; This field is deprecated and no longer functional.", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - Deprecated: "This field is deprecated and no longer functional.", - }, - "remote_storage_enable": { - Type: schema.TypeBool, - Description: "remote.storage.enable value", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "local_retention_bytes": { - Type: schema.TypeString, - Description: "local.retention.bytes value. This field is temporarily disabled.", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - "local_retention_ms": { - Type: schema.TypeString, - Description: "local.retention.ms value. This field is temporarily disabled.", - Optional: true, - DiffSuppressFunc: schemautil.EmptyObjectDiffSuppressFunc, - }, - }, + Schema: aivenKafkaTopicConfigSchema, }, }, } @@ -270,6 +276,12 @@ func ResourceKafkaTopic() *schema.Resource { return errors.New("number of partitions cannot be decreased") } + retentionBytes, rOK := d.GetOk("config.0.retention_bytes") + localRetentionBytes, lOK := d.GetOk("config.0.local_retention_bytes") + if rOK && lOK && *schemautil.ParseOptionalStringToInt64(retentionBytes) < *schemautil.ParseOptionalStringToInt64(localRetentionBytes) { + return errLocalRetentionBytes + } + return nil }, } @@ -356,8 +368,8 @@ func getKafkaTopicConfig(d *schema.ResourceData) aiven.KafkaTopicConfig { SegmentMs: schemautil.ParseOptionalStringToInt64(configRaw["segment_ms"]), UncleanLeaderElectionEnable: schemautil.OptionalBoolPointer(d, "config.0.unclean_leader_election_enable"), RemoteStorageEnable: schemautil.OptionalBoolPointer(d, "config.0.remote_storage_enable"), - //LocalRetentionBytes: schemautil.ParseOptionalStringToInt64(configRaw["local_retention_bytes"]), - //LocalRetentionMs: schemautil.ParseOptionalStringToInt64(configRaw["local_retention_ms"]), + LocalRetentionBytes: schemautil.ParseOptionalStringToInt64(configRaw["local_retention_bytes"]), + LocalRetentionMs: schemautil.ParseOptionalStringToInt64(configRaw["local_retention_ms"]), } } @@ -404,7 +416,12 @@ func resourceKafkaTopicRead(ctx context.Context, d *schema.ResourceData, m inter if err := d.Set("replication", topic.Replication); err != nil { return diag.FromErr(err) } - if err := d.Set("config", flattenKafkaTopicConfig(topic)); err != nil { + + config, err := FlattenKafkaTopicConfig(topic) + if err != nil { + return diag.FromErr(err) + } + if err := d.Set("config", config); err != nil { return diag.FromErr(err) } @@ -484,36 +501,29 @@ func resourceKafkaTopicDelete(ctx context.Context, d *schema.ResourceData, m int return nil } -func flattenKafkaTopicConfig(t *aiven.KafkaTopic) []map[string]interface{} { - return []map[string]interface{}{ - { - "cleanup_policy": schemautil.ToOptionalString(t.Config.CleanupPolicy.Value), - "compression_type": schemautil.ToOptionalString(t.Config.CompressionType.Value), - "delete_retention_ms": schemautil.ToOptionalString(t.Config.DeleteRetentionMs.Value), - "file_delete_delay_ms": schemautil.ToOptionalString(t.Config.FileDeleteDelayMs.Value), - "flush_messages": schemautil.ToOptionalString(t.Config.FlushMessages.Value), - "flush_ms": schemautil.ToOptionalString(t.Config.FlushMs.Value), - "index_interval_bytes": schemautil.ToOptionalString(t.Config.IndexIntervalBytes.Value), - "max_compaction_lag_ms": schemautil.ToOptionalString(t.Config.MaxCompactionLagMs.Value), - "max_message_bytes": schemautil.ToOptionalString(t.Config.MaxMessageBytes.Value), - "message_downconversion_enable": t.Config.MessageDownconversionEnable.Value, - "message_format_version": schemautil.ToOptionalString(t.Config.MessageFormatVersion.Value), - "message_timestamp_difference_max_ms": schemautil.ToOptionalString(t.Config.MessageTimestampDifferenceMaxMs.Value), - "message_timestamp_type": schemautil.ToOptionalString(t.Config.MessageTimestampType.Value), - "min_cleanable_dirty_ratio": t.Config.MinCleanableDirtyRatio.Value, - "min_compaction_lag_ms": schemautil.ToOptionalString(t.Config.MinCompactionLagMs.Value), - "min_insync_replicas": schemautil.ToOptionalString(t.Config.MinInsyncReplicas.Value), - "preallocate": t.Config.Preallocate.Value, - "retention_bytes": schemautil.ToOptionalString(t.Config.RetentionBytes.Value), - "retention_ms": schemautil.ToOptionalString(t.Config.RetentionMs.Value), - "segment_bytes": schemautil.ToOptionalString(t.Config.SegmentBytes.Value), - "segment_index_bytes": schemautil.ToOptionalString(t.Config.SegmentIndexBytes.Value), - "segment_jitter_ms": schemautil.ToOptionalString(t.Config.SegmentJitterMs.Value), - "segment_ms": schemautil.ToOptionalString(t.Config.SegmentMs.Value), - "unclean_leader_election_enable": t.Config.UncleanLeaderElectionEnable.Value, - "remote_storage_enable": t.Config.RemoteStorageEnable.Value, - //"local_retention_bytes": schemautil.ToOptionalString(t.Config.LocalRetentionBytes.Value), - //"local_retention_ms": schemautil.ToOptionalString(t.Config.LocalRetentionMs.Value), - }, +func FlattenKafkaTopicConfig(t *aiven.KafkaTopic) ([]map[string]interface{}, error) { + source := make(map[string]struct { + Value any `json:"value"` + }) + + data, err := json.Marshal(t.Config) + if err != nil { + return nil, err + } + + err = json.Unmarshal(data, &source) + if err != nil { + return nil, err } + + config := make(map[string]any) + for k, v := range source { + if aivenKafkaTopicConfigSchema[k].Type == schema.TypeString { + config[k] = schemautil.ToOptionalString(v.Value) + } else { + config[k] = v.Value + } + } + + return []map[string]interface{}{config}, nil } diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index 6a949c940..253a9d754 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/aiven/aiven-go-client/v2" + "github.com/google/go-cmp/cmp" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" "github.com/hashicorp/terraform-plugin-testing/helper/acctest" "github.com/hashicorp/terraform-plugin-testing/helper/resource" @@ -20,6 +21,7 @@ import ( acc "github.com/aiven/terraform-provider-aiven/internal/acctest" "github.com/aiven/terraform-provider-aiven/internal/schemautil" "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/kafkatopicrepository" + "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/service/kafkatopic" ) func TestAccAivenKafkaTopic_basic(t *testing.T) { @@ -584,3 +586,120 @@ func partitions(numPartitions int) (partitions []*aiven.Partition) { } return } + +func TestAccAivenKafkaTopic_local_retention_bytes_error(t *testing.T) { + project := os.Getenv("AIVEN_PROJECT_NAME") + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acc.TestAccPreCheck(t) }, + ProtoV6ProviderFactories: acc.TestProtoV6ProviderFactories, + CheckDestroy: testAccCheckAivenKafkaTopicResourceDestroy, + Steps: []resource.TestStep{ + { + Config: fmt.Sprintf(` +resource "aiven_kafka_topic" "topic" { + project = %q + service_name = "whataver" + topic_name = "whataver" + partitions = 5 + replication = 2 + + config { + local_retention_bytes = 3 + retention_bytes = 1 + } + +}`, project), + ExpectError: regexp.MustCompile(`local_retention_bytes must not be more than retention_bytes value`), + }, + }, + }) +} + +func TestFlattenKafkaTopicConfig(t *testing.T) { + cases := []struct { + name string + expect map[string]any + config aiven.KafkaTopicConfigResponse + }{ + { + name: "all fields", + expect: map[string]any{ + "cleanup_policy": "foo", + "compression_type": "bar", + "delete_retention_ms": "0", + "file_delete_delay_ms": "1", + "flush_messages": "2", + "flush_ms": "3", + "index_interval_bytes": "4", + "local_retention_bytes": "5", + "local_retention_ms": "6", + "max_compaction_lag_ms": "7", + "max_message_bytes": "8", + "message_downconversion_enable": false, + "message_format_version": "", + "message_timestamp_difference_max_ms": "0", + "message_timestamp_type": "", + "min_cleanable_dirty_ratio": 0.2, + "min_compaction_lag_ms": "0", + "min_insync_replicas": "0", + "preallocate": true, + "remote_storage_enable": false, + "retention_bytes": "0", + "retention_ms": "0", + "segment_bytes": "0", + "segment_index_bytes": "0", + "segment_jitter_ms": "0", + "segment_ms": "0", + "unclean_leader_election_enable": true, + }, + config: aiven.KafkaTopicConfigResponse{ + CleanupPolicy: &aiven.KafkaTopicConfigResponseString{Value: "foo"}, + CompressionType: &aiven.KafkaTopicConfigResponseString{Value: "bar"}, + DeleteRetentionMs: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + FileDeleteDelayMs: &aiven.KafkaTopicConfigResponseInt{Value: 1}, + FlushMessages: &aiven.KafkaTopicConfigResponseInt{Value: 2}, + FlushMs: &aiven.KafkaTopicConfigResponseInt{Value: 3}, + IndexIntervalBytes: &aiven.KafkaTopicConfigResponseInt{Value: 4}, + LocalRetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 5}, + LocalRetentionMs: &aiven.KafkaTopicConfigResponseInt{Value: 6}, + MaxCompactionLagMs: &aiven.KafkaTopicConfigResponseInt{Value: 7}, + MaxMessageBytes: &aiven.KafkaTopicConfigResponseInt{Value: 8}, + MessageDownconversionEnable: &aiven.KafkaTopicConfigResponseBool{Value: false}, + MessageFormatVersion: &aiven.KafkaTopicConfigResponseString{Value: ""}, + MessageTimestampDifferenceMaxMs: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + MessageTimestampType: &aiven.KafkaTopicConfigResponseString{Value: ""}, + MinCleanableDirtyRatio: &aiven.KafkaTopicConfigResponseFloat{Value: 0.2}, + MinCompactionLagMs: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + MinInsyncReplicas: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + Preallocate: &aiven.KafkaTopicConfigResponseBool{Value: true}, + RemoteStorageEnable: &aiven.KafkaTopicConfigResponseBool{Value: false}, + RetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + RetentionMs: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + SegmentBytes: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + SegmentIndexBytes: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + SegmentJitterMs: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + SegmentMs: &aiven.KafkaTopicConfigResponseInt{Value: 0}, + UncleanLeaderElectionEnable: &aiven.KafkaTopicConfigResponseBool{Value: true}, + }, + }, + { + name: "few fields", + expect: map[string]any{ + "local_retention_bytes": "1", + "retention_bytes": "2", + }, + config: aiven.KafkaTopicConfigResponse{ + LocalRetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 1}, + RetentionBytes: &aiven.KafkaTopicConfigResponseInt{Value: 2}, + }, + }, + } + + for _, opt := range cases { + t.Run(opt.name, func(t *testing.T) { + result, err := kafkatopic.FlattenKafkaTopicConfig(&aiven.KafkaTopic{Config: opt.config}) + assert.NoError(t, err) + assert.Empty(t, cmp.Diff([]map[string]any{opt.expect}, result)) + }) + } +}