Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for topic description and ownership #1843

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ nav_order: 1
- Use `ServiceGet` from the code-generated client
- Use the code-generated client to manage `aiven_service_integration` and `aiven_service_integration_endpoint`
- Use Go 1.23
- Add capability to set description and owner group per topic.

roope-kar marked this conversation as resolved.
Show resolved Hide resolved
## [4.24.0] - 2024-08-21

Expand Down
2 changes: 2 additions & 0 deletions docs/data-sources/kafka_topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ data "aiven_kafka_topic" "example_topic" {

- `config` (List of Object) [Advanced parameters](https://aiven.io/docs/products/kafka/reference/advanced-params) to configure topics. (see [below for nested schema](#nestedatt--config))
- `id` (String) The ID of this resource.
- `owner_user_group_id` (String) The user group that is the owner of the topic
- `partitions` (Number) The number of partitions to create in the topic.
- `replication` (Number) The replication factor for the topic.
- `tag` (Set of Object) Tags for the topic. (see [below for nested schema](#nestedatt--tag))
- `termination_protection` (Boolean) Prevents topics from being deleted by Terraform. It's recommended for topics containing critical data. **Topics can still be deleted in the Aiven Console.**
- `topic_description` (String) The description of the topic

<a id="nestedatt--config"></a>
### Nested Schema for `config`
Expand Down
2 changes: 2 additions & 0 deletions docs/resources/kafka_topic.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ resource "aiven_kafka_topic" "example_topic" {
### Optional

- `config` (Block List, Max: 1) [Advanced parameters](https://aiven.io/docs/products/kafka/reference/advanced-params) to configure topics. (see [below for nested schema](#nestedblock--config))
- `owner_user_group_id` (String) The user group that is the owner of the topic
- `tag` (Block Set) Tags for the topic. (see [below for nested schema](#nestedblock--tag))
- `termination_protection` (Boolean) Prevents topics from being deleted by Terraform. It's recommended for topics containing critical data. **Topics can still be deleted in the Aiven Console.**
- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts))
- `topic_description` (String) The description of the topic

### Read-Only

Expand Down
53 changes: 47 additions & 6 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ var aivenKafkaTopicSchema = map[string]*schema.Schema{
Required: true,
Description: "The replication factor for the topic.",
},
"topic_description": {
Type: schema.TypeString,
Optional: true,
Description: "The description of the topic",
},
"owner_user_group_id": {
Type: schema.TypeString,
Optional: true,
Description: "The user group that is the owner of the topic",
},
"termination_protection": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -308,6 +318,8 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
topicName := d.Get("topic_name").(string)
partitions := d.Get("partitions").(int)
replication := d.Get("replication").(int)
topicDescription := d.Get("topic_description").(string)
ownerUserGroupID := d.Get("owner_user_group_id").(string)

config, err := getKafkaTopicConfig(d)
if err != nil {
Expand All @@ -322,6 +334,14 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
Tags: getTags(d),
}

if topicDescription != "" {
createRequest.TopicDescription = &topicDescription
}

if ownerUserGroupID != "" {
createRequest.OwnerUserGroupId = &ownerUserGroupID
}

client := m.(*aiven.Client)
err = kafkatopicrepository.New(client.KafkaTopics).Create(ctx, project, serviceName, createRequest)
if err != nil {
Expand Down Expand Up @@ -464,6 +484,14 @@ func resourceKafkaTopicRead(ctx context.Context, d *schema.ResourceData, m inter
return diag.Errorf("error setting Kafka Topic Tags for resource %s: %s", d.Id(), err)
}

if err := d.Set("topic_description", topic.TopicDescription); err != nil {
return diag.FromErr(err)
}

if err := d.Set("owner_user_group_id", topic.OwnerUserGroupId); err != nil {
return diag.FromErr(err)
}

return nil
}

Expand Down Expand Up @@ -499,18 +527,31 @@ func resourceKafkaTopicUpdate(ctx context.Context, d *schema.ResourceData, m int
return diag.Errorf("config to json error: %s", err)
}

topicDescription := d.Get("topic_description").(string)
ownerUserGroupID := d.Get("owner_user_group_id").(string)

updateRequest := aiven.UpdateKafkaTopicRequest{
Partitions: &partitions,
Replication: schemautil.OptionalIntPointer(d, "replication"),
Config: config,
Tags: getTags(d),
}

if topicDescription != "" {
updateRequest.TopicDescription = &topicDescription
}

if ownerUserGroupID != "" {
updateRequest.OwnerUserGroupId = &ownerUserGroupID
}

client := m.(*aiven.Client)
err = kafkatopicrepository.New(client.KafkaTopics).Update(
ctx,
projectName,
serviceName,
topicName,
aiven.UpdateKafkaTopicRequest{
Partitions: &partitions,
Replication: schemautil.OptionalIntPointer(d, "replication"),
Config: config,
Tags: getTags(d),
},
updateRequest,
)
if err != nil {
return diag.FromErr(err)
Expand Down
83 changes: 61 additions & 22 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

func TestAccAivenKafkaTopic_basic(t *testing.T) {
resourceName := "aiven_kafka_topic.foo"
topic2ResourceName := "aiven_kafka_topic.topic2"
rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)
resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acc.TestAccPreCheck(t) },
Expand All @@ -43,6 +44,8 @@ func TestAccAivenKafkaTopic_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "replication", "2"),
resource.TestCheckResourceAttr(resourceName, "termination_protection", "false"),
resource.TestCheckResourceAttr(resourceName, "config.0.retention_bytes", "1234"),
resource.TestCheckResourceAttr(topic2ResourceName, "topic_description", fmt.Sprintf("test-acc-topic2-desc-%s", rName)),
resource.TestCheckResourceAttrSet(topic2ResourceName, "owner_user_group_id"),
),
},
},
Expand Down Expand Up @@ -138,6 +141,16 @@ resource "aiven_kafka_topic" "more" {

func testAccKafkaTopicResource(name string) string {
return fmt.Sprintf(`
data "aiven_organization" "foo" {
name = "%s"
}

resource "aiven_organization_user_group" "foo" {
name = "test-acc-u-grp-%s"
organization_id = data.aiven_organization.foo.id
description = "test"
}

data "aiven_project" "foo" {
project = "%s"
}
Expand Down Expand Up @@ -173,7 +186,17 @@ data "aiven_kafka_topic" "topic" {
topic_name = aiven_kafka_topic.foo.topic_name

depends_on = [aiven_kafka_topic.foo]
}`, os.Getenv("AIVEN_PROJECT_NAME"), name, name)
}

resource "aiven_kafka_topic" "topic2" {
project = data.aiven_project.foo.project
service_name = aiven_kafka.bar.service_name
topic_name = "test-acc-topic2-%s"
topic_description = "test-acc-topic2-desc-%s"
owner_user_group_id = aiven_organization_user_group.foo.group_id
partitions = 3
replication = 2
}`, os.Getenv("AIVEN_ORGANIZATION_NAME"), name, os.Getenv("AIVEN_PROJECT_NAME"), name, name, name, name)
}
roope-kar marked this conversation as resolved.
Show resolved Hide resolved

func testAccKafkaTopicCustomTimeoutsResource(name string) string {
Expand Down Expand Up @@ -294,35 +317,51 @@ func testAccCheckAivenKafkaTopicResourceDestroy(s *terraform.State) error {

ctx := context.Background()

// loop through the resources in state, verifying each kafka topic is destroyed
// loop through the resources in state, verifying each created resource is destroyed
for _, rs := range s.RootModule().Resources {
if rs.Type != "aiven_kafka_topic" {
continue
}
if rs.Type == "aiven_kafka_topic" {
project, serviceName, topicName, err := schemautil.SplitResourceID3(rs.Primary.ID)
if err != nil {
return err
}

project, serviceName, topicName, err := schemautil.SplitResourceID3(rs.Primary.ID)
if err != nil {
return err
}
_, err = c.Services.Get(ctx, project, serviceName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
}
return err
}

_, err = c.Services.Get(ctx, project, serviceName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
t, err := c.KafkaTopics.Get(ctx, project, serviceName, topicName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
}
return err
}
return err
}

t, err := c.KafkaTopics.Get(ctx, project, serviceName, topicName)
if err != nil {
if aiven.IsNotFound(err) {
return nil
if t != nil {
return fmt.Errorf("kafka topic (%s) still exists, id %s", topicName, rs.Primary.ID)
}
return err
}
if rs.Type == "aiven_organization_user_group" {
orgID, userGroupID, err := schemautil.SplitResourceID2(rs.Primary.ID)
if err != nil {
return err
}

r, err := c.OrganizationUserGroups.Get(ctx, orgID, userGroupID)
if err != nil {
if aiven.IsNotFound(err) {
return nil
}
return err
}

if t != nil {
return fmt.Errorf("kafka topic (%s) still exists, id %s", topicName, rs.Primary.ID)
if r != nil {
return fmt.Errorf("organization user group (%s) still exists", rs.Primary.ID)
}
}
}

Expand Down