Skip to content

Commit

Permalink
fix bugs, tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dweinshenker committed Oct 20, 2023
1 parent 3b19a20 commit 0e874c2
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 62 deletions.
120 changes: 70 additions & 50 deletions digitalocean/database/resource_database_kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package database

import (
"context"
"errors"
"fmt"
"log"
"strconv"
"strings"

"github.com/digitalocean/godo"
"github.com/digitalocean/terraform-provider-digitalocean/digitalocean/config"
Expand All @@ -20,7 +22,7 @@ func ResourceDigitalOceanDatabaseKafkaTopic() *schema.Resource {
UpdateContext: resourceDigitalOceanDatabaseKafkaTopicUpdate,
DeleteContext: resourceDigitalOceanDatabaseKafkaTopicDelete,
Importer: &schema.ResourceImporter{
State: resourceDigitalOceanDatabaseUserImport,
State: resourceDigitalOceanDatabaseKafkaTopicImport,
},

Schema: map[string]*schema.Schema{
Expand Down Expand Up @@ -50,15 +52,21 @@ func ResourceDigitalOceanDatabaseKafkaTopic() *schema.Resource {
ValidateFunc: validation.IntAtLeast(2),
Default: 2,
},
"state": {
Type: schema.TypeString,
Computed: true,
},
"config": {
Type: schema.TypeList,
Optional: true,
Computed: true,
ForceNew: false,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"cleanup_policy": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateFunc: validation.StringInSlice([]string{
"delete",
"compact",
Expand All @@ -68,6 +76,7 @@ func ResourceDigitalOceanDatabaseKafkaTopic() *schema.Resource {
"compression_type": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateFunc: validation.StringInSlice([]string{
"snappy",
"gzip",
Expand All @@ -80,54 +89,54 @@ func ResourceDigitalOceanDatabaseKafkaTopic() *schema.Resource {
"delete_retention_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"file_delete_delay_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"flush_messages": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"flush_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"index_interval_bytes": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"max_compaction_lag_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"max_message_bytes": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"message_down_conversion_enable": {
Type: schema.TypeBool,
Optional: true,
ForceNew: false,
Computed: true,
},
"message_format_version": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validation.StringInSlice([]string{
"0.8.0",
"0.8.1",
Expand Down Expand Up @@ -204,13 +213,13 @@ func ResourceDigitalOceanDatabaseKafkaTopic() *schema.Resource {
"message_timestamp_difference_max_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateInt64(),
},
"message_timestamp_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validation.StringInSlice([]string{
"create_time",
"log_append_time",
Expand All @@ -219,71 +228,66 @@ func ResourceDigitalOceanDatabaseKafkaTopic() *schema.Resource {
"min_cleanable_dirty_ratio": {
Type: schema.TypeFloat,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validation.FloatBetween(0.0, 1.0),
},
"min_compaction_lag_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"min_insync_replicas": {
Type: schema.TypeInt,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validation.IntAtLeast(1),
},
"preallocate": {
Type: schema.TypeBool,
Optional: true,
ForceNew: false,
},
"remote_storage_enable": {
Type: schema.TypeBool,
Optional: true,
ForceNew: false,
Computed: true,
},
"retention_bytes": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateInt64(),
},
"retention_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateInt64(),
},
"segment_bytes": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"segment_index_bytes": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"segment_jitter_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"segment_ms": {
Type: schema.TypeString,
Optional: true,
ForceNew: false,
Computed: true,
ValidateFunc: validateUint64(),
},
"unclean_leader_election_enable": {
Type: schema.TypeBool,
Optional: true,
ForceNew: false,
Computed: true,
},
},
},
Expand Down Expand Up @@ -364,11 +368,12 @@ func resourceDigitalOceanDatabaseKafkaTopicRead(ctx context.Context, d *schema.R

d.Set("state", topic.State)
d.Set("replication_factor", topic.ReplicationFactor)
d.Set("partitions", topic.Partitions)
d.Set("partition_count", len(topic.Partitions))
// updating 'partition_count' is async, the number of partitions returned in the API will not be updated immeadiately in the response
// setting this property to the current state rather than the number of `partitions` returned in the GetTopic response
d.Set("partition_count", d.Get("partition_count").(int))

if topic.Config != nil {
d.Set("config", flattenTopicConfig(topic.Config))
if err := d.Set("config", flattenTopicConfig(topic.Config)); err != nil {
return diag.Errorf("Error setting topic config: %#v", err)
}

return nil
Expand All @@ -394,26 +399,27 @@ func flattenTopicConfig(config *godo.TopicConfig) []map[string]interface{} {

item["cleanup_policy"] = config.CleanupPolicy
item["compression_type"] = config.CompressionType
item["delete_retention_ms"] = config.DeleteRetentionMS
item["file_delete_delay_ms"] = config.FileDeleteDelayMS
item["flush_messages"] = config.FlushMessages
item["flush_ms"] = config.FlushMS
item["index_interval_bytes"] = config.IndexIntervalBytes
item["max_compaction_lag_ms"] = config.MaxCompactionLagMS
item["max_message_bytes"] = config.MaxMessageBytes
item["message_down_conversion_enable"] = config.MessageDownConversionEnable
item["delete_retention_ms"] = strconv.FormatUint(*config.DeleteRetentionMS, 10)
item["file_delete_delay_ms"] = strconv.FormatUint(*config.FileDeleteDelayMS, 10)
item["flush_messages"] = strconv.FormatUint(*config.FlushMessages, 10)
item["flush_ms"] = strconv.FormatUint(*config.FlushMS, 10)
item["index_interval_bytes"] = strconv.FormatUint(*config.IndexIntervalBytes, 10)
item["max_compaction_lag_ms"] = strconv.FormatUint(*config.MaxCompactionLagMS, 10)
item["max_message_bytes"] = strconv.FormatUint(*config.MaxMessageBytes, 10)
item["message_down_conversion_enable"] = *config.MessageDownConversionEnable
item["message_format_version"] = config.MessageFormatVersion
item["message_timestamp_difference_max_ms"] = config.MessageTimestampDifferenceMaxMS
item["message_timestamp_difference_max_ms"] = strconv.FormatUint(*config.MessageTimestampDifferenceMaxMS, 10)
item["message_timestamp_type"] = config.MessageTimestampType
item["min_cleanable_dirty_ratio"] = config.MinCleanableDirtyRatio
item["min_compaction_lag_ms"] = config.MinCompactionLagMS
item["retention_bytes"] = config.RetentionBytes
item["retention_ms"] = config.RetentionMS
item["segment_bytes"] = config.SegmentBytes
item["segment_index_bytes"] = config.SegmentIndexBytes
item["segment_jitter_ms"] = config.SegmentJitterMS
item["segment_ms"] = config.SegmentMS
item["unclean_leader_election_enable"] = config.UncleanLeaderElectionEnable
item["min_cleanable_dirty_ratio"] = *config.MinCleanableDirtyRatio
item["min_compaction_lag_ms"] = strconv.FormatUint(*config.MinCompactionLagMS, 10)
item["min_insync_replicas"] = int(*config.MinInsyncReplicas)
item["retention_bytes"] = strconv.FormatInt(*config.RetentionBytes, 10)
item["retention_ms"] = strconv.FormatInt(*config.RetentionMS, 10)
item["segment_bytes"] = strconv.FormatUint(*config.SegmentBytes, 10)
item["segment_index_bytes"] = strconv.FormatUint(*config.SegmentIndexBytes, 10)
item["segment_jitter_ms"] = strconv.FormatUint(*config.SegmentJitterMS, 10)
item["segment_ms"] = strconv.FormatUint(*config.SegmentMS, 10)
item["unclean_leader_election_enable"] = *config.UncleanLeaderElectionEnable
result = append(result, item)

return result
Expand Down Expand Up @@ -447,6 +453,7 @@ func validateUint64() schema.SchemaValidateFunc {

func getTopicConfig(raw []interface{}) *godo.TopicConfig {
res := &godo.TopicConfig{}
res.CleanupPolicy = "compact_delete"
for _, kv := range raw {
cfg := kv.(map[string]interface{})

Expand Down Expand Up @@ -571,3 +578,16 @@ func getTopicConfig(raw []interface{}) *godo.TopicConfig {

return res
}

func resourceDigitalOceanDatabaseKafkaTopicImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
if strings.Contains(d.Id(), ",") {
s := strings.Split(d.Id(), ",")
d.SetId(makeDatabaseTopicID(s[0], s[1]))
d.Set("cluster_id", s[0])
d.Set("name", s[1])
} else {
return nil, errors.New("must use the ID of the source database cluster and the name of the topic joined with a comma (e.g. `id,name`)")
}

return []*schema.ResourceData{d}, nil
}
Loading

0 comments on commit 0e874c2

Please sign in to comment.