Skip to content

Commit

Permalink
[aiven] fix ZK topic creation
Browse files Browse the repository at this point in the history
  • Loading branch information
giuseppelillo committed Jan 5, 2024
1 parent b2a9e6f commit 9a60194
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
9 changes: 5 additions & 4 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,14 @@ class ZkAdminManager(val config: KafkaConfig,
try {
if (metadataCache.contains(topic.name))
throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
val maybeUuid = Option(topic.id())
maybeUuid match {
case Some(id) =>

val maybeUuid = topic.id() match {
case Uuid.ZERO_UUID => None
case id =>
if (metadataCache.topicNamesToIds().containsValue(id)) {
throw new TopicExistsException(s"Topic id '$id' already exists.")
}
case _ =>
Some(id)
}

val nullConfigs = topic.configs.asScala.filter(_.value == null).map(_.name)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class AdminZkClient(zkClient: KafkaZkClient,
validateTopicCreate(topic, partitionReplicaAssignment, config)

info(s"Creating topic $topic with configuration $config and initial partition " +
s"assignment $partitionReplicaAssignment")
s"assignment $partitionReplicaAssignment, usesTopicId $usesTopicId topicId $maybeTopicId")

// write out the config if there is any, this isn't transactional with the partition assignments
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
Expand Down

0 comments on commit 9a60194

Please sign in to comment.