diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java index 3fb2859d2dd46..9307820ba5051 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java @@ -41,6 +41,10 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig { public static final String CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor for checkpoints topic."; public static final short CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; + protected static final String CHECKPOINTS_TASKS_MAXIMUM = "checkpoints.tasks.max"; + protected static final String CHECKPOINTS_TASKS_MAXIMUM_DOC = "Maximum number of checkpoint connector tasks."; + private static final int CHECKPOINTS_TASKS_MAX_DEFAULT = 1; + protected static final String TASK_CONSUMER_GROUPS = "task.assigned.groups"; public static final String CONSUMER_POLL_TIMEOUT_MILLIS = "consumer.poll.timeout.ms"; @@ -124,6 +128,10 @@ Duration syncGroupOffsetsInterval() { } } + Integer getCheckpointConnectorTaskMax() { + return getInt(CHECKPOINTS_TASKS_MAXIMUM); + } + Map taskConfigForConsumerGroups(List groups, int taskIndex) { Map props = originalsStrings(); props.put(TASK_CONSUMER_GROUPS, String.join(",", groups)); @@ -250,6 +258,12 @@ private static ConfigDef defineCheckpointConfig(ConfigDef baseConfig) { CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT, ConfigDef.Importance.LOW, CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC) + .define( + CHECKPOINTS_TASKS_MAXIMUM, + ConfigDef.Type.INT, + CHECKPOINTS_TASKS_MAX_DEFAULT, + ConfigDef.Importance.LOW, + CHECKPOINTS_TASKS_MAXIMUM_DOC) .define( OFFSET_SYNCS_TOPIC_LOCATION, ConfigDef.Type.STRING, diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index 0af3b14e3b8e9..2111ef8c0f4e0 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -146,8 +146,11 @@ public List> taskConfigs(int maxTasks) { || config.emitCheckpointsInterval().isNegative()) { return Collections.emptyList(); } - int numTasks = Math.min(maxTasks, knownConsumerGroups.size()); - List> groupsPartitioned = ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks); + + final int limitCheckpointConnectorsTasks = Math.min(maxTasks, config.getCheckpointConnectorTaskMax()); + final int numTasks = Math.min(limitCheckpointConnectorsTasks, knownConsumerGroups.size()); + log.info("Limiting the CheckpointConnector tasks to {}", numTasks); + final List> groupsPartitioned = ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks); return IntStream.range(0, numTasks) .mapToObj(i -> config.taskConfigForConsumerGroups(groupsPartitioned.get(i), i)) .collect(Collectors.toList()); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java index d726ee7c0ecb1..16e900db8dd81 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java @@ -87,6 +87,52 @@ public void testMirrorCheckpointConnectorEnabled() { "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " failed"); } + @Test + public void testMirrorCheckpointConnectorEnabledLimitMaxTasksDefaultOfOne() { + // enable the checkpoint emission + MirrorCheckpointConfig config = new MirrorCheckpointConfig( + makeProps("emit.checkpoints.enabled", "true")); + + Set knownConsumerGroups = new HashSet<>(); + for (int i = 0; i < 500; i++) { + knownConsumerGroups.add(String.format("consumer-group-%d", i)); + } + // MirrorCheckpointConnector as minimum to run taskConfig() + MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, + config); + List> output = connector.taskConfigs(100); + // expect 5 task will be created + final int expectedTaskCount = 1; + assertEquals(expectedTaskCount, output.size(), "Expected number of tasks incorrect."); + final long groupsInTask = Arrays.stream(output.get(0).get(MirrorCheckpointConfig.TASK_CONSUMER_GROUPS).split(",")).count(); + assertEquals(500L, groupsInTask); + } + + @Test + public void testMirrorCheckpointConnectorEnabledLimitMaxTasksToFive() { + // enable the checkpoint emission + MirrorCheckpointConfig config = new MirrorCheckpointConfig( + makeProps("emit.checkpoints.enabled", "true", + "checkpoints.tasks.max", "5")); + + Set knownConsumerGroups = new HashSet<>(); + for (int i = 0; i < 500; i++) { + knownConsumerGroups.add(String.format("consumer-group-%d", i)); + } + // MirrorCheckpointConnector as minimum to run taskConfig() + MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, + config); + List> output = connector.taskConfigs(100); + // expect 5 task will be created + final int expectedTaskCount = 5; + assertEquals(expectedTaskCount, output.size(), "Expected number of tasks incorrect."); + output.forEach(task -> { + final long groupsInTask = Arrays.stream(task.get(MirrorCheckpointConfig.TASK_CONSUMER_GROUPS).split(",")).count(); + assertEquals(100L, groupsInTask); + }); + + } + @Test public void testNoConsumerGroup() { MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());