From bd88077eb4a2b1eedd0e7b5db8a47df7719f96d4 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Wed, 20 Nov 2024 08:56:22 +0200 Subject: [PATCH] mirrormaker: restrict the number of checkpoint connector tasks The MirrorCheckpointConnector uses the global tasks max for setting the number of maximum tasks. This can cause extreme burden on single Kafka broker when hundreds of checkpoint tasks are consuming from from the single partition topic. The default is 1 and configurable. --- .../mirror/MirrorCheckpointConfig.java | 14 ++++++ .../mirror/MirrorCheckpointConnector.java | 7 ++- .../mirror/MirrorCheckpointConnectorTest.java | 46 +++++++++++++++++++ 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java index 3fb2859d2dd46..9307820ba5051 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java @@ -41,6 +41,10 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig { public static final String CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor for checkpoints topic."; public static final short CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; + protected static final String CHECKPOINTS_TASKS_MAXIMUM = "checkpoints.tasks.max"; + protected static final String CHECKPOINTS_TASKS_MAXIMUM_DOC = "Maximum number of checkpoint connector tasks."; + private static final int CHECKPOINTS_TASKS_MAX_DEFAULT = 1; + protected static final String TASK_CONSUMER_GROUPS = "task.assigned.groups"; public static final String CONSUMER_POLL_TIMEOUT_MILLIS = "consumer.poll.timeout.ms"; @@ -124,6 +128,10 @@ Duration syncGroupOffsetsInterval() { } } + Integer getCheckpointConnectorTaskMax() { + return getInt(CHECKPOINTS_TASKS_MAXIMUM); + } + Map taskConfigForConsumerGroups(List groups, int taskIndex) { Map props = originalsStrings(); props.put(TASK_CONSUMER_GROUPS, String.join(",", groups)); @@ -250,6 +258,12 @@ private static ConfigDef defineCheckpointConfig(ConfigDef baseConfig) { CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DEFAULT, ConfigDef.Importance.LOW, CHECKPOINTS_TOPIC_REPLICATION_FACTOR_DOC) + .define( + CHECKPOINTS_TASKS_MAXIMUM, + ConfigDef.Type.INT, + CHECKPOINTS_TASKS_MAX_DEFAULT, + ConfigDef.Importance.LOW, + CHECKPOINTS_TASKS_MAXIMUM_DOC) .define( OFFSET_SYNCS_TOPIC_LOCATION, ConfigDef.Type.STRING, diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java index 0af3b14e3b8e9..2111ef8c0f4e0 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java @@ -146,8 +146,11 @@ public List> taskConfigs(int maxTasks) { || config.emitCheckpointsInterval().isNegative()) { return Collections.emptyList(); } - int numTasks = Math.min(maxTasks, knownConsumerGroups.size()); - List> groupsPartitioned = ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks); + + final int limitCheckpointConnectorsTasks = Math.min(maxTasks, config.getCheckpointConnectorTaskMax()); + final int numTasks = Math.min(limitCheckpointConnectorsTasks, knownConsumerGroups.size()); + log.info("Limiting the CheckpointConnector tasks to {}", numTasks); + final List> groupsPartitioned = ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks); return IntStream.range(0, numTasks) .mapToObj(i -> config.taskConfigForConsumerGroups(groupsPartitioned.get(i), i)) .collect(Collectors.toList()); diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java index d726ee7c0ecb1..16e900db8dd81 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java @@ -87,6 +87,52 @@ public void testMirrorCheckpointConnectorEnabled() { "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " failed"); } + @Test + public void testMirrorCheckpointConnectorEnabledLimitMaxTasksDefaultOfOne() { + // enable the checkpoint emission + MirrorCheckpointConfig config = new MirrorCheckpointConfig( + makeProps("emit.checkpoints.enabled", "true")); + + Set knownConsumerGroups = new HashSet<>(); + for (int i = 0; i < 500; i++) { + knownConsumerGroups.add(String.format("consumer-group-%d", i)); + } + // MirrorCheckpointConnector as minimum to run taskConfig() + MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, + config); + List> output = connector.taskConfigs(100); + // expect 5 task will be created + final int expectedTaskCount = 1; + assertEquals(expectedTaskCount, output.size(), "Expected number of tasks incorrect."); + final long groupsInTask = Arrays.stream(output.get(0).get(MirrorCheckpointConfig.TASK_CONSUMER_GROUPS).split(",")).count(); + assertEquals(500L, groupsInTask); + } + + @Test + public void testMirrorCheckpointConnectorEnabledLimitMaxTasksToFive() { + // enable the checkpoint emission + MirrorCheckpointConfig config = new MirrorCheckpointConfig( + makeProps("emit.checkpoints.enabled", "true", + "checkpoints.tasks.max", "5")); + + Set knownConsumerGroups = new HashSet<>(); + for (int i = 0; i < 500; i++) { + knownConsumerGroups.add(String.format("consumer-group-%d", i)); + } + // MirrorCheckpointConnector as minimum to run taskConfig() + MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, + config); + List> output = connector.taskConfigs(100); + // expect 5 task will be created + final int expectedTaskCount = 5; + assertEquals(expectedTaskCount, output.size(), "Expected number of tasks incorrect."); + output.forEach(task -> { + final long groupsInTask = Arrays.stream(task.get(MirrorCheckpointConfig.TASK_CONSUMER_GROUPS).split(",")).count(); + assertEquals(100L, groupsInTask); + }); + + } + @Test public void testNoConsumerGroup() { MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());