Skip to content

Commit

Permalink
fix(mirrormaker_replication_flow): incorrect behavior of schema fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Serpentiel committed Mar 15, 2024
1 parent f5dc3ed commit b47f77b
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 25 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ nav_order: 1
- Add `AIVEN_ALLOW_IP_FILTER_PURGE` environment variable to allow purging of IP filters. This is a safety feature to
prevent accidental purging of IP filters, which can lead to loss of access to services. To enable purging, set the
environment variable to any value before running Terraform commands.
- Fix incorrect behavior of `aiven_mirrormaker_replication_flow` schema fields:
- `sync_group_offsets_enabled`
- `sync_group_offsets_interval_seconds`
- `emit_backward_heartbeats_enabled`
- `offset_syncs_topic_location`
- `replication_policy_class`

## [4.14.0] - 2024-02-20

Expand Down
4 changes: 2 additions & 2 deletions docs/resources/mirrormaker_replication_flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ resource "aiven_mirrormaker_replication_flow" "f1" {
### Required

- `enable` (Boolean) Enable of disable replication flows for a service.
- `offset_syncs_topic_location` (String) Offset syncs topic location.
- `project` (String) Identifies the project this resource belongs to. To set up proper dependencies please refer to this variable as a reference. Changing this property forces recreation of the resource.
- `replication_policy_class` (String) Replication policy class. The possible values are `org.apache.kafka.connect.mirror.DefaultReplicationPolicy` and `org.apache.kafka.connect.mirror.IdentityReplicationPolicy`. The default value is `org.apache.kafka.connect.mirror.DefaultReplicationPolicy`.
- `service_name` (String) Identifies the project this resource belongs to. To set up proper dependencies please refer to this variable as a reference. Changing this property forces recreation of the resource.
- `source_cluster` (String) Source cluster alias. Maximum length: `128`.
- `target_cluster` (String) Target cluster alias. Maximum length: `128`.
Expand All @@ -47,8 +49,6 @@ resource "aiven_mirrormaker_replication_flow" "f1" {

- `emit_backward_heartbeats_enabled` (Boolean) Whether to emit heartbeats to the direction opposite to the flow, i.e. to the source cluster. The default value is `false`.
- `emit_heartbeats_enabled` (Boolean) Whether to emit heartbeats to the target cluster. The default value is `false`.
- `offset_syncs_topic_location` (String) Offset syncs topic location.
- `replication_policy_class` (String) Replication policy class. The possible values are `org.apache.kafka.connect.mirror.DefaultReplicationPolicy` and `org.apache.kafka.connect.mirror.IdentityReplicationPolicy`. The default value is `org.apache.kafka.connect.mirror.DefaultReplicationPolicy`.
- `sync_group_offsets_enabled` (Boolean) Sync consumer group offsets. The default value is `false`.
- `sync_group_offsets_interval_seconds` (Number) Frequency of consumer group offset sync. The default value is `1`.
- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/aiven/terraform-provider-aiven
go 1.22

require (
github.com/aiven/aiven-go-client/v2 v2.13.0
github.com/aiven/aiven-go-client/v2 v2.14.0
github.com/aiven/go-client-codegen v0.2.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/dave/jennifer v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7l
github.com/agext/levenshtein v1.2.3/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/aiven/aiven-go-client/v2 v2.13.0 h1:N2c+EdRh9gg2PBKAwII6vMy9+WnDnbVU4Uy1mQGmMKg=
github.com/aiven/aiven-go-client/v2 v2.13.0/go.mod h1:x0xhzxWEKAwKv0xY5FvECiI6tesWshcPHvjwl0B/1SU=
github.com/aiven/aiven-go-client/v2 v2.14.0 h1:4FEbB3baj+jA3Gd3vXpzoN6Vi9lGLCcIAYo75vSnx4k=
github.com/aiven/aiven-go-client/v2 v2.14.0/go.mod h1:x0xhzxWEKAwKv0xY5FvECiI6tesWshcPHvjwl0B/1SU=
github.com/aiven/go-api-schemas v1.65.0 h1:r4ooY83kWwkQQPCq55W4oHitNv+SZ2fzVDxuY3KwU28=
github.com/aiven/go-api-schemas v1.65.0/go.mod h1:/bPxBUHza/2Aeer6hIIdB++GxKiw9K1KCBtRa2rtZ5I=
github.com/aiven/go-client-codegen v0.2.0 h1:f82CHhXCBbrBIUa3yworytXjiolp/Q73epnbRjQIwBk=
Expand Down
73 changes: 51 additions & 22 deletions internal/sdkprovider/service/kafka/mirrormaker_replication_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ var aivenMirrorMakerReplicationFlowSchema = map[string]*schema.Schema{
},
"replication_policy_class": {
Type: schema.TypeString,
Optional: true,
Default: defaultReplicationPolicy,
Required: true,
ValidateFunc: validation.StringInSlice(replicationPolicies, false),
Description: userconfig.Desc("Replication policy class.").DefaultValue(defaultReplicationPolicy).PossibleValues(schemautil.StringSliceToInterfaceSlice(replicationPolicies)...).Build(),
Description: userconfig.Desc("Replication policy class.").
DefaultValue(defaultReplicationPolicy).
PossibleValues(schemautil.StringSliceToInterfaceSlice(replicationPolicies)...).Build(),
},
"sync_group_offsets_enabled": {
Type: schema.TypeBool,
Expand Down Expand Up @@ -102,7 +103,7 @@ var aivenMirrorMakerReplicationFlowSchema = map[string]*schema.Schema{
},
"offset_syncs_topic_location": {
Type: schema.TypeString,
Optional: true,
Required: true,
Description: "Offset syncs topic location.",
ValidateFunc: validation.StringInSlice([]string{"source", "target"}, false),
},
Expand Down Expand Up @@ -144,10 +145,10 @@ func resourceMirrorMakerReplicationFlowCreate(ctx context.Context, d *schema.Res
Topics: schemautil.FlattenToString(d.Get("topics").([]interface{})),
TopicsBlacklist: schemautil.FlattenToString(d.Get("topics_blacklist").([]interface{})),
ReplicationPolicyClass: d.Get("replication_policy_class").(string),
SyncGroupOffsetsEnabled: d.Get("sync_group_offsets_enabled").(bool),
SyncGroupOffsetsIntervalSeconds: d.Get("sync_group_offsets_interval_seconds").(int),
EmitHeartbeatsEnabled: d.Get("emit_heartbeats_enabled").(bool),
EmitBackwardHeartbeatsEnabled: d.Get("emit_backward_heartbeats_enabled").(bool),
SyncGroupOffsetsEnabled: schemautil.OptionalBoolPointer(d, "sync_group_offsets_enabled"),
SyncGroupOffsetsIntervalSeconds: schemautil.OptionalIntPointer(d, "sync_group_offsets_interval_seconds"),
EmitHeartbeatsEnabled: schemautil.OptionalBoolPointer(d, "emit_heartbeats_enabled"),
EmitBackwardHeartbeatsEnabled: schemautil.OptionalBoolPointer(d, "emit_backward_heartbeats_enabled"),
OffsetSyncsTopicLocation: d.Get("offset_syncs_topic_location").(string),
},
})
Expand All @@ -168,7 +169,9 @@ func resourceMirrorMakerReplicationFlowRead(ctx context.Context, d *schema.Resou
return diag.FromErr(err)
}

replicationFlow, err := client.KafkaMirrorMakerReplicationFlow.Get(ctx, project, serviceName, sourceCluster, targetCluster)
replicationFlow, err := client.KafkaMirrorMakerReplicationFlow.Get(
ctx, project, serviceName, sourceCluster, targetCluster,
)
if err != nil {
return diag.FromErr(schemautil.ResourceReadHandleNotFound(err, d))
}
Expand All @@ -194,21 +197,47 @@ func resourceMirrorMakerReplicationFlowRead(ctx context.Context, d *schema.Resou
if err := d.Set("topics_blacklist", replicationFlow.ReplicationFlow.TopicsBlacklist); err != nil {
return diag.FromErr(err)
}
if err := d.Set("replication_policy_class", replicationFlow.ReplicationFlow.ReplicationPolicyClass); err != nil {
if err := d.Set(
"replication_policy_class",
replicationFlow.ReplicationFlow.ReplicationPolicyClass,
); err != nil {
return diag.FromErr(err)
}
if err := d.Set("sync_group_offsets_enabled", replicationFlow.ReplicationFlow.SyncGroupOffsetsEnabled); err != nil {
return diag.FromErr(err)
if replicationFlow.ReplicationFlow.SyncGroupOffsetsEnabled != nil {
if err := d.Set(
"sync_group_offsets_enabled",
*replicationFlow.ReplicationFlow.SyncGroupOffsetsEnabled,
); err != nil {
return diag.FromErr(err)
}
}
if err := d.Set("sync_group_offsets_interval_seconds", replicationFlow.ReplicationFlow.SyncGroupOffsetsIntervalSeconds); err != nil {
return diag.FromErr(err)
if replicationFlow.ReplicationFlow.SyncGroupOffsetsIntervalSeconds != nil {
if err := d.Set(
"sync_group_offsets_interval_seconds",
*replicationFlow.ReplicationFlow.SyncGroupOffsetsIntervalSeconds,
); err != nil {
return diag.FromErr(err)
}
}
if err := d.Set("emit_heartbeats_enabled", replicationFlow.ReplicationFlow.EmitHeartbeatsEnabled); err != nil {
return diag.FromErr(err)
if replicationFlow.ReplicationFlow.EmitHeartbeatsEnabled != nil {
if err := d.Set(
"emit_heartbeats_enabled",
*replicationFlow.ReplicationFlow.EmitHeartbeatsEnabled,
); err != nil {
return diag.FromErr(err)
}
}
if replicationFlow.ReplicationFlow.EmitBackwardHeartbeatsEnabled != nil {
if err := d.Set(
"emit_backward_heartbeats_enabled",
*replicationFlow.ReplicationFlow.EmitBackwardHeartbeatsEnabled,
); err != nil {
return diag.FromErr(err)
}
}
if err := d.Set(
"emit_backward_heartbeats_enabled",
replicationFlow.ReplicationFlow.EmitBackwardHeartbeatsEnabled,
"offset_syncs_topic_location",
replicationFlow.ReplicationFlow.OffsetSyncsTopicLocation,
); err != nil {
return diag.FromErr(err)
}
Expand Down Expand Up @@ -239,10 +268,10 @@ func resourceMirrorMakerReplicationFlowUpdate(ctx context.Context, d *schema.Res
Topics: schemautil.FlattenToString(d.Get("topics").([]interface{})),
TopicsBlacklist: schemautil.FlattenToString(d.Get("topics_blacklist").([]interface{})),
ReplicationPolicyClass: d.Get("replication_policy_class").(string),
SyncGroupOffsetsEnabled: d.Get("sync_group_offsets_enabled").(bool),
SyncGroupOffsetsIntervalSeconds: d.Get("sync_group_offsets_interval_seconds").(int),
EmitHeartbeatsEnabled: d.Get("emit_heartbeats_enabled").(bool),
EmitBackwardHeartbeatsEnabled: d.Get("emit_backward_heartbeats_enabled").(bool),
SyncGroupOffsetsEnabled: schemautil.OptionalBoolPointer(d, "sync_group_offsets_enabled"),
SyncGroupOffsetsIntervalSeconds: schemautil.OptionalIntPointer(d, "sync_group_offsets_interval_seconds"),
EmitHeartbeatsEnabled: schemautil.OptionalBoolPointer(d, "emit_heartbeats_enabled"),
EmitBackwardHeartbeatsEnabled: schemautil.OptionalBoolPointer(d, "emit_backward_heartbeats_enabled"),
OffsetSyncsTopicLocation: d.Get("offset_syncs_topic_location").(string),
},
},
Expand Down

0 comments on commit b47f77b

Please sign in to comment.