From 9a6019444d7090c4208ed9d8e5497e2880311ea3 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Fri, 5 Jan 2024 15:08:50 +0100 Subject: [PATCH] [aiven] fix ZK topic creation --- core/src/main/scala/kafka/server/ZkAdminManager.scala | 9 +++++---- core/src/main/scala/kafka/zk/AdminZkClient.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/ZkAdminManager.scala b/core/src/main/scala/kafka/server/ZkAdminManager.scala index 47186b61903fd..69b7f61ee9aa8 100644 --- a/core/src/main/scala/kafka/server/ZkAdminManager.scala +++ b/core/src/main/scala/kafka/server/ZkAdminManager.scala @@ -166,13 +166,14 @@ class ZkAdminManager(val config: KafkaConfig, try { if (metadataCache.contains(topic.name)) throw new TopicExistsException(s"Topic '${topic.name}' already exists.") - val maybeUuid = Option(topic.id()) - maybeUuid match { - case Some(id) => + + val maybeUuid = topic.id() match { + case Uuid.ZERO_UUID => None + case id => if (metadataCache.topicNamesToIds().containsValue(id)) { throw new TopicExistsException(s"Topic id '$id' already exists.") } - case _ => + Some(id) } val nullConfigs = topic.configs.asScala.filter(_.value == null).map(_.name) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index dd8c91982fe33..df0935cc51b2c 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -109,7 +109,7 @@ class AdminZkClient(zkClient: KafkaZkClient, validateTopicCreate(topic, partitionReplicaAssignment, config) info(s"Creating topic $topic with configuration $config and initial partition " + - s"assignment $partitionReplicaAssignment") + s"assignment $partitionReplicaAssignment, usesTopicId $usesTopicId topicId $maybeTopicId") // write out the config if there is any, this isn't transactional with the partition assignments zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)