Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Create topic with id #43

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.kafka.clients.admin;

import java.util.Optional;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
Expand All @@ -35,6 +37,7 @@
*/
public class NewTopic {

private final Uuid id;
private final String name;
private final Optional<Integer> numPartitions;
private final Optional<Short> replicationFactor;
Expand All @@ -48,12 +51,17 @@ public NewTopic(String name, int numPartitions, short replicationFactor) {
this(name, Optional.of(numPartitions), Optional.of(replicationFactor));
}

public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
this(null, name, numPartitions, replicationFactor);
}

/**
* A new topic that optionally defaults {@code numPartitions} and {@code replicationFactor} to
* the broker configurations for {@code num.partitions} and {@code default.replication.factor}
* respectively.
*/
public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
public NewTopic(Uuid id, String name, Optional<Integer> numPartitions, Optional<Short> replicationFactor) {
this.id = id;
this.name = name;
this.numPartitions = numPartitions;
this.replicationFactor = replicationFactor;
Expand All @@ -68,12 +76,17 @@ public NewTopic(String name, Optional<Integer> numPartitions, Optional<Short> re
* generally a good idea for all partitions to have the same number of replicas.
*/
public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments) {
this.id = null;
this.name = name;
this.numPartitions = Optional.empty();
this.replicationFactor = Optional.empty();
this.replicasAssignments = Collections.unmodifiableMap(replicasAssignments);
}

public Uuid id() {
return id;
}

/**
* The name of the topic to be created.
*/
Expand Down Expand Up @@ -126,6 +139,10 @@ CreatableTopic convertToCreatableTopic() {
setName(name).
setNumPartitions(numPartitions.orElse(CreateTopicsRequest.NO_NUM_PARTITIONS)).
setReplicationFactor(replicationFactor.orElse(CreateTopicsRequest.NO_REPLICATION_FACTOR));
if (id != null) {
System.out.println("Setting topic with id: " + id);
creatableTopic.setId(id);
}
if (replicasAssignments != null) {
for (Entry<Integer, List<Integer>> entry : replicasAssignments.entrySet()) {
creatableTopic.assignments().add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
"about": "The configuration name." },
{ "name": "Value", "type": "string", "versions": "0+", "nullableVersions": "0+",
"about": "The configuration value." }
]}
]},
{ "name": "Id", "type": "uuid", "tag": 0, "taggedVersions": "5+", "versions": "5+", "ignorable": true,
"about": "Optional topic id."}
]},
{ "name": "timeoutMs", "type": "int32", "versions": "0+", "default": "60000",
"about": "How long to wait in milliseconds before timing out the request." },
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ object TopicCommand extends Logging {
val replicationFactor = opts.replicationFactor
val replicaAssignment = opts.replicaAssignment
val configsToAdd = parseTopicConfigsToBeAdded(opts)
val topicId = opts.topicId.map(Uuid.fromString)

def hasReplicaAssignment: Boolean = replicaAssignment.isDefined
def ifTopicDoesntExist(): Boolean = opts.ifNotExists
Expand Down Expand Up @@ -229,6 +230,7 @@ object TopicCommand extends Logging {
new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
else {
new NewTopic(
topic.topicId.orNull,
topic.name,
topic.partitions.asJava,
topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/kafka/server/ZkAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.kafka.admin.AdminUtils
import org.apache.kafka.clients.admin.{AlterConfigOp, ScramMechanism}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, ThrottlingQuotaExceededException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedVersionException}
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData.AlterUserScramCredentialsResult
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
Expand Down Expand Up @@ -167,6 +166,14 @@ class ZkAdminManager(val config: KafkaConfig,
try {
if (metadataCache.contains(topic.name))
throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
val maybeUuid = topic.id() match {
case Uuid.ZERO_UUID => None
case id =>
if (metadataCache.topicNamesToIds().containsValue(id)) {
throw new TopicExistsException(s"Topic id '$id' already exists.")
}
Some(id)
}

val nullConfigs = topic.configs.asScala.filter(_.value == null).map(_.name)
if (nullConfigs.nonEmpty)
Expand Down Expand Up @@ -211,7 +218,7 @@ class ZkAdminManager(val config: KafkaConfig,
CreatePartitionsMetadata(topic.name, assignments.keySet)
} else {
controllerMutationQuota.record(assignments.size)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId)
adminZkClient.createTopicWithAssignment(topic.name, configs, assignments, validate = false, config.usesTopicId, maybeUuid)
populateIds(includeConfigsAndMetadata, topic.name)
CreatePartitionsMetadata(topic.name, assignments.keySet)
}
Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class AdminZkClient(zkClient: KafkaZkClient,
config: Properties,
partitionReplicaAssignment: Map[Int, Seq[Int]],
validate: Boolean = true,
usesTopicId: Boolean = false): Unit = {
usesTopicId: Boolean = false,
maybeTopicId: Option[Uuid] = None): Unit = {
if (validate)
validateTopicCreate(topic, partitionReplicaAssignment, config)

Expand All @@ -114,7 +115,7 @@ class AdminZkClient(zkClient: KafkaZkClient,

// create the partition assignment
writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },
isUpdate = false, usesTopicId)
isUpdate = false, usesTopicId, maybeTopicId)
}

/**
Expand Down Expand Up @@ -166,12 +167,18 @@ class AdminZkClient(zkClient: KafkaZkClient,
}

private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, ReplicaAssignment],
isUpdate: Boolean, usesTopicId: Boolean = false): Unit = {
isUpdate: Boolean, usesTopicId: Boolean = false,
maybeTopicId: Option[Uuid] = None): Unit = {
try {
val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap

if (!isUpdate) {
val topicIdOpt = if (usesTopicId) Some(Uuid.randomUuid()) else None
val topicIdOpt = if (usesTopicId) {
maybeTopicId match {
case None => Some(Uuid.randomUuid())
case _ => maybeTopicId
}
} else None
zkClient.createTopicAssignment(topic, topicIdOpt, assignment.map { case (k, v) => k -> v.replicas })
} else {
val topicIds = zkClient.getTopicIdsForTopics(Set(topic))
Expand Down
17 changes: 16 additions & 1 deletion core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.utils.CoreUtils._
import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import kafka.zk.{AdminZkClient, ConfigEntityTypeZNode, KafkaZkClient}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.config.internals.QuotaConfigs
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
Expand All @@ -53,6 +53,21 @@ class AdminZkClientTest extends QuorumTestHarness with Logging with RackAwareTes
super.tearDown()
}

@Test
def testCreateTopicWithId(): Unit = {
val brokers = List(0, 1, 2, 3, 4)
TestUtils.createBrokersInZk(zkClient, brokers)

val topicConfig = new Properties()

val assignment = Map(0 -> List(0, 1, 2),
1 -> List(1, 2, 3))
val uuid = Uuid.randomUuid
adminZkClient.createTopicWithAssignment("test", topicConfig, assignment, usesTopicId = true, maybeTopicId = Some(uuid))
val found = zkClient.getTopicIdsForTopics(Set("test"))
assertEquals(uuid, found("test"))
}

@Test
def testManualReplicaAssignment(): Unit = {
val brokers = List(0, 1, 2, 3, 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,15 @@ private ApiError createTopic(ControllerRequestContext context,
boolean authorizedToReturnConfigs) {
Map<String, String> creationConfigs = translateCreationConfigs(topic.configs());
Map<Integer, PartitionRegistration> newParts = new HashMap<>();
Uuid topicId;
if (topic.id() == null || topic.id() == Uuid.ZERO_UUID) {
topicId = Uuid.randomUuid();
} else {
if (topics.containsKey(topic.id())) {
return ApiError.fromThrowable(new InvalidTopicException("Topic id " + topic.id() + " already exists"));
}
topicId = topic.id();
}
if (!topic.assignments().isEmpty()) {
if (topic.replicationFactor() != -1) {
return new ApiError(INVALID_REQUEST,
Expand Down Expand Up @@ -725,7 +734,6 @@ private ApiError createTopic(ControllerRequestContext context,
numPartitions, e.throttleTimeMs());
return ApiError.fromThrowable(e);
}
Uuid topicId = Uuid.randomUuid();
CreatableTopicResult result = new CreatableTopicResult().
setName(topic.name()).
setTopicId(topicId).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,14 @@ private ReplicationControlTestContext(
clusterControl.activate();
}

CreatableTopicResult createTestTopic(String name,
CreatableTopicResult createTestTopic(Uuid id,
String name,
int numPartitions,
short replicationFactor,
short expectedErrorCode) throws Exception {
CreateTopicsRequestData request = new CreateTopicsRequestData();
CreatableTopic topic = new CreatableTopic().setName(name);
if (id != null) topic.setId(id);
topic.setNumPartitions(numPartitions).setReplicationFactor(replicationFactor);
request.topics().add(topic);
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
Expand All @@ -253,6 +255,13 @@ CreatableTopicResult createTestTopic(String name,
return topicResult;
}

CreatableTopicResult createTestTopic(String name,
int numPartitions,
short replicationFactor,
short expectedErrorCode) throws Exception {
return createTestTopic(null, name, numPartitions, replicationFactor, expectedErrorCode);
}

CreatableTopicResult createTestTopic(String name, int[][] replicas) throws Exception {
return createTestTopic(name, replicas, Collections.emptyMap(), (short) 0);
}
Expand Down Expand Up @@ -752,6 +761,19 @@ public void testInvalidCreateTopicsWithValidateOnlyFlag() throws Exception {
assertEquals(expectedResponse, result.response());
}

@Test
public void testCreateTopicsWithId() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ctx.registerBrokers(0, 1, 2);
ctx.unfenceBrokers(0, 1, 2);

Uuid id = Uuid.randomUuid();
CreatableTopicResult initialTopic = ctx.createTestTopic(id, "foo.bar", 2, (short) 2, NONE.code());
assertEquals(id, ctx.replicationControl.getTopic(initialTopic.topicId()).topicId());
CreatableTopicResult resultWithErrors = ctx.createTestTopic(id, "foo.baz", 2, (short) 2, INVALID_TOPIC_EXCEPTION.code());
assertEquals("Topic id " + id + " already exists", resultWithErrors.errorMessage());
}

@Test
public void testCreateTopicsWithPolicy() throws Exception {
MockCreateTopicPolicy createTopicPolicy = new MockCreateTopicPolicy(asList(
Expand Down
Loading