Skip to content

Commit

Permalink
feat: Add support for description and owner for topics
Browse files Browse the repository at this point in the history
makes it possible to provide topic_description and owner_user_group_id
when creating and updating topic. Also adds it to the schema.
  • Loading branch information
roope-kar committed Sep 5, 2024
1 parent fe8f1de commit d9390fd
Showing 2 changed files with 27 additions and 3 deletions.
8 changes: 8 additions & 0 deletions kafka_topic.go
Original file line number Diff line number Diff line change
@@ -122,6 +122,8 @@ type (
TopicName string `json:"topic_name"`
Config KafkaTopicConfigResponse `json:"config"`
Tags []KafkaTopicTag `json:"tags,omitempty"`
TopicDescription string `json:"topic_description"`
OwnerUserGroupId string `json:"owner_user_group_id"`
}

// KafkaListTopic represents kafka list topic model on Aiven.
@@ -134,6 +136,8 @@ type (
RetentionHours *int64 `json:"retention_hours,omitempty"`
State string `json:"state"`
TopicName string `json:"topic_name"`
TopicDescription string `json:"topic_description"`
OwnerUserGroupId string `json:"owner_user_group_id"`
}

// Partition represents a Kafka partition.
@@ -169,6 +173,8 @@ type (
TopicName string `json:"topic_name"`
Config KafkaTopicConfig `json:"config"`
Tags []KafkaTopicTag `json:"tags,omitempty"`
TopicDescription string `json:"topic_description,omitempty"`
OwnerUserGroupId string `json:"owner_user_group_id,omitempty"`
}

// UpdateKafkaTopicRequest are the parameters used to update a kafka topic.
@@ -180,6 +186,8 @@ type (
RetentionHours *int `json:"retention_hours,omitempty"`
Config KafkaTopicConfig `json:"config"`
Tags []KafkaTopicTag `json:"tags,omitempty"`
TopicDescription string `json:"topic_description,omitempty"`
OwnerUserGroupId string `json:"owner_user_group_id,omitempty"`
}

// KafkaTopicResponse is the response for Kafka Topic requests.
22 changes: 19 additions & 3 deletions kafka_topic_acc_test.go
Original file line number Diff line number Diff line change
@@ -75,12 +75,16 @@ var _ = Describe("Kafka Topic", func() {

// kafka topic
var (
errC error
topicName string
segmentJitterMs int64
errC error
topicName string
segmentJitterMs int64
topicDescription string
ownerUserGroupId string
)
segmentJitterMs = 10
topicName = "test1"
topicDescription = "test-description"
ownerUserGroupId = "org22ba494e096"

It("create kafka topic", func() {
time.Sleep(10 * time.Second)
@@ -96,6 +100,8 @@ var _ = Describe("Kafka Topic", func() {
Value: "tag1-value",
},
},
TopicDescription: topicDescription,
OwnerUserGroupId: ownerUserGroupId,
})

Eventually(func() string {
@@ -123,13 +129,18 @@ var _ = Describe("Kafka Topic", func() {
Expect(t.Tags).NotTo(BeEmpty())
Expect(t.Tags[0].Key).To(Equal("tag1-key"))
Expect(t.Tags[0].Value).To(Equal("tag1-value"))
Expect(t.TopicDescription).To(Equal(topicDescription))
Expect(t.OwnerUserGroupId).To(Equal(ownerUserGroupId))
}
})

It("should update topic config", func() {
var uncleanLeaderElectionEnable = true
var newTopicDescription = "test-description-2"
var newOwnerUserGroupId = "org22ba494e097"

errU := client.KafkaTopics.Update(ctx, projectName, serviceName, topicName, UpdateKafkaTopicRequest{
TopicDescription: newTopicDescription,
Config: KafkaTopicConfig{
UncleanLeaderElectionEnable: &uncleanLeaderElectionEnable,
},
@@ -154,6 +165,8 @@ var _ = Describe("Kafka Topic", func() {
Expect(t2.Config.UncleanLeaderElectionEnable.Value).Should(Equal(true))
Expect(t2.Tags).ShouldNot(BeEmpty())
Expect(len(t2.Tags)).To(Equal(2))
Expect(t2.TopicDescription).To(Equal(newTopicDescription))
Expect(t2.OwnerUserGroupId).To(Equal(newOwnerUserGroupId))
}
})

@@ -166,6 +179,9 @@ var _ = Describe("Kafka Topic", func() {
Expect(list[0].Config.SegmentJitterMs.Value).To(Equal(segmentJitterMs))
Expect(list[0].Tags).NotTo(BeEmpty())
Expect(len(list[0].Tags)).To(Equal(2))
Expect(list[0].TopicDescription).To(Equal(topicDescription))
Expect(list[0].OwnerUserGroupId).To(Equal(ownerUserGroupId))

})

It("delete Kafka Topic and Kafka service", func() {

0 comments on commit d9390fd

Please sign in to comment.