Skip to content

Commit

Permalink
[aiven] feat: add topic id tagged field to create topic api
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo authored and juha-aiven committed Jul 31, 2024
1 parent 67fac03 commit 630ad58
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicConfig;
Expand All @@ -35,6 +36,7 @@
*/
public class NewTopic {

private final Uuid id;
private final String name;
private final Optional<Integer> numPartitions;
private final Optional<Short> replicationFactor;
Expand All @@ -48,12 +50,17 @@ public NewTopic(String name, int numPartitions, short replicationFactor) {
this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
}

public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
this(null, name, numPartitions, replicationFactor);
}

/**
* A new topic that optionally defaults {@code numPartitions} and {@code replicationFactor} to
* the broker configurations for {@code num.partitions} and {@code default.replication.factor}
* respectively.
*/
public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
public NewTopic(Uuid id, String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
this.id = id;
this.name = name;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
Expand All @@ -68,12 +75,17 @@ public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> re
* generally a good idea for all partitions to have the same number of replicas.
*/
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
this.id = null;
this.name = name;
this.numPartitions = Optional.empty();
this.replicationFactor = Optional.empty();
this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments);
}

public Uuid id() {
return id;
}

/**
* The name of the topic to be created.
*/
Expand Down Expand Up @@ -126,6 +138,9 @@ CreatableTopic convertToCreatableTopic() {
setName(name).
setNumPartitions(numPartitions.orElse(CreateTopicsRequest.NO_NUM_PARTITIONS)).
setReplicationFactor(replicationFactor.orElse(CreateTopicsRequest.NO_REPLICATION_FACTOR));
if (id != null) {
creatableTopic.setId(id);
}
if (replicasAssignments != null) {
for (Entry<Integer, List<Integer>> entry : replicasAssignments.entrySet()) {
creatableTopic.assignments().add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
"about": "The configuration name." },
{ "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The configuration value." }
]}
]},
{ "name": "Id", "type": "uuid", "tag": 0, "taggedVersions": "5+", "versions": "5+", "ignorable": true,
"about": "Optional topic id."}
]},
{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "How long to wait in milliseconds before timing out the request." },
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, ThrottlingQuotaExceededException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
Expand Down Expand Up @@ -169,6 +168,14 @@ class ZkAdminManager(val config: KafkaConfig,
try {
if (metadataCache.contains(topic.name))
throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
val maybeUuid = topic.id() match {
case Uuid.ZERO_UUID => None
case id =>
if (metadataCache.topicNamesToIds().containsValue(id)) {
throw new TopicExistsException(s"Topic id '$id' already exists.")
}
Some(id)
}

val nullConfigs = topic.configs.asScala.filter(_.value == null).map(_.name)
if (nullConfigs.nonEmpty)
Expand Down Expand Up @@ -213,7 +220,7 @@ class ZkAdminManager(val config: KafkaConfig,
CreatePartitionsMetadata(topic.name, assignments.keySet)
} else {
controllerMutationQuota.record(assignments.size)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId, maybeUuid)
populateIds(includeConfigsAndMetadata, topic.name)
CreatePartitionsMetadata(topic.name, assignments.keySet)
}
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ class AdminZkClient(zkClient: KafkaZkClient,
config: Properties,
partitionReplicaAssignment: Map[Int, Seq[Int]],
validate: Boolean = true,
usesTopicId: Boolean = false): Unit = {
usesTopicId: Boolean = false,
maybeTopicId: Option[Uuid] = None): Unit = {
if (validate)
validateTopicCreate(topic, partitionReplicaAssignment, config)

Expand All @@ -116,7 +117,7 @@ class AdminZkClient(zkClient: KafkaZkClient,

// create the partition assignment
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },
isUpdate = false, usesTopicId)
isUpdate = false, usesTopicId, maybeTopicId)
}

/**
Expand Down Expand Up @@ -168,12 +169,18 @@ class AdminZkClient(zkClient: KafkaZkClient,
}

private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment],
isUpdate: Boolean, usesTopicId: Boolean = false): Unit = {
isUpdate: Boolean, usesTopicId: Boolean = false,
maybeTopicId: Option[Uuid] = None): Unit = {
try {
val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap

if (!isUpdate) {
val topicIdOpt = if (usesTopicId) Some(Uuid.randomUuid()) else None
val topicIdOpt = if (usesTopicId) {
maybeTopicId match {
case None => Some(Uuid.randomUuid())
case _ => maybeTopicId
}
} else None
zkClient.createTopicAssignment(topic, topicIdOpt, assignment.map { case (k, v) => k -> v.replicas })
} else {
val topicIds = zkClient.getTopicIdsForTopics(Set(topic))
Expand Down
17 changes: 16 additions & 1 deletion core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import kafka.zk._
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.metrics.Quota
Expand Down Expand Up @@ -57,6 +57,21 @@ class AdminZkClientTest extends QuorumTestHarness with Logging with RackAwareTes
super.tearDown()
}

@Test
def testCreateTopicWithId(): Unit = {
val brokers = List(0, 1, 2, 3, 4)
createBrokersInZk(zkClient, brokers)

val topicConfig = new Properties()

val assignment = Map(0 -> List(0, 1, 2),
1 -> List(1, 2, 3))
val uuid = Uuid.randomUuid
adminZkClient.createTopicWithAssignment("test", topicConfig, assignment, usesTopicId = true, maybeTopicId = Some(uuid))
val found = zkClient.getTopicIdsForTopics(Set("test"))
assertEquals(uuid, found("test"))
}

@Test
def testManualReplicaAssignment(): Unit = {
val brokers = List(0, 1, 2, 3, 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,15 @@ private ApiError createTopic(ControllerRequestContext context,
boolean authorizedToReturnConfigs) {
Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
Map<Integer, PartitionRegistration> newParts = new HashMap<>();
Uuid topicId;
if (topic.id() == null || topic.id() == Uuid.ZERO_UUID) {
topicId = Uuid.randomUuid();
} else {
if (topics.containsKey(topic.id())) {
return ApiError.fromThrowable(new InvalidTopicException("Topic id " + topic.id() + " already exists"));
}
topicId = topic.id();
}
if (!topic.assignments().isEmpty()) {
if (topic.replicationFactor() != -1) {
return new ApiError(INVALID_REQUEST,
Expand Down Expand Up @@ -786,7 +795,6 @@ private ApiError createTopic(ControllerRequestContext context,
numPartitions, e.throttleTimeMs());
return ApiError.fromThrowable(e);
}
Uuid topicId = Uuid.randomUuid();
CreatableTopicResult result = new CreatableTopicResult().
setName(topic.name()).
setTopicId(topicId).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,14 @@ void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> record
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
}

CreatableTopicResult createTestTopic(String name,
CreatableTopicResult createTestTopic(Uuid id,
String name,
int numPartitions,
short replicationFactor,
short expectedErrorCode) {
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreatableTopic topic = new CreatableTopic().setName(name);
if (id != null) topic.setId(id);
topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
request.topics().add(topic);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
Expand All @@ -285,6 +287,13 @@ CreatableTopicResult createTestTopic(String name,
return topicResult;
}

CreatableTopicResult createTestTopic(String name,
int numPartitions,
short replicationFactor,
short expectedErrorCode) {
return createTestTopic(null, name, numPartitions, replicationFactor, expectedErrorCode);
}

CreatableTopicResult createTestTopic(String name, int[][] replicas) {
return createTestTopic(name, replicas, Collections.emptyMap(), (short) 0);
}
Expand Down Expand Up @@ -831,6 +840,19 @@ public void testInvalidCreateTopicsWithValidateOnlyFlag() {
assertEquals(expectedResponse, result.response());
}

@Test
public void testCreateTopicsWithId() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);

Uuid id = Uuid.randomUuid();
CreatableTopicResult initialTopic = ctx.createTestTopic(id, "foo.bar", 2, (short) 2, NONE.code());
assertEquals(id, ctx.replicationControl.getTopic(initialTopic.topicId()).topicId());
CreatableTopicResult resultWithErrors = ctx.createTestTopic(id, "foo.baz", 2, (short) 2, INVALID_TOPIC_EXCEPTION.code());
assertEquals("Topic id " + id + " already exists", resultWithErrors.errorMessage());
}

@Test
public void testCreateTopicsWithPolicy() {
MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(asList(
Expand Down
4 changes: 3 additions & 1 deletion tools/src/main/java/org/apache/kafka/tools/TopicCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ static class CommandTopicPartition {
private final Optional<Integer> replicationFactor;
private final Map<Integer, List<Integer>> replicaAssignment;
private final Properties configsToAdd;
private final Optional<Uuid> topicId;

private final TopicCommandOptions opts;

Expand All @@ -265,6 +266,7 @@ public CommandTopicPartition(TopicCommandOptions options) {
replicationFactor = options.replicationFactor();
replicaAssignment = options.replicaAssignment().orElse(Collections.emptyMap());
configsToAdd = parseTopicConfigsToBeAdded(options);
topicId = options.topicId().map(Uuid::fromString);
}

public Boolean hasReplicaAssignment() {
Expand Down Expand Up @@ -478,7 +480,7 @@ public void createTopic(CommandTopicPartition topic) throws Exception {
if (topic.hasReplicaAssignment()) {
newTopic = new NewTopic(topic.name, topic.replicaAssignment);
} else {
newTopic = new NewTopic(topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue));
newTopic = new NewTopic(topic.topicId.orElse(null), topic.name, topic.partitions, topic.replicationFactor.map(Integer::shortValue));
}

Map<String, String> configsMap = topic.configsToAdd.stringPropertyNames().stream()
Expand Down

0 comments on commit 630ad58

Please sign in to comment.