From 2267cb6ec832c505fce36594ae5f05a0d798130f Mon Sep 17 00:00:00 2001 From: Justine Olshan Date: Fri, 17 May 2024 21:35:28 -0700 Subject: [PATCH] KAFKA-16992: InvalidRequestException: ADD_PARTITIONS_TO_TXN with version 4 which is not enabled when upgrading from kafka (#15971) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We weren't enabling discoverBrokerVersions to check the supported versions in the AddPartitionsToTxnManager. This means that any verification request (or any AddPartitionsToTxnRequest version) from a newer broker would fail when sending to an older broker. The bulk of this change is adding additional transactions system tests for old versions. One test upgrades the cluster completely. This didn't catch the issue but could be useful. The other test forces a new broker to send a verification request to an older one. Without the discoverBrokerVersions change, all tests between mixed brokers failed. (We introduced a new request version in 3.8 -- which is a separate version from the one that caused the bug for 3.5 -> 3.6) With the addition, the tests all passed. I also manually ran a test for 3.5 -> 3.6 since the issue there was slightly different and was caused by the unstableLatestVersion flag being enabled. This change should fix this as well. 👍 Reviewers: David Jacot --- .../main/java/kafka/server/NetworkUtils.java | 2 +- .../core/transactions_mixed_versions_test.py | 222 ++++++++++++++++ .../kafkatest/tests/core/transactions_test.py | 48 +--- .../tests/core/transactions_upgrade_test.py | 240 ++++++++++++++++++ tests/kafkatest/utils/transactions_utils.py | 55 ++++ 5 files changed, 528 insertions(+), 39 deletions(-) create mode 100644 tests/kafkatest/tests/core/transactions_mixed_versions_test.py create mode 100644 tests/kafkatest/tests/core/transactions_upgrade_test.py create mode 100644 tests/kafkatest/utils/transactions_utils.py diff --git a/core/src/main/java/kafka/server/NetworkUtils.java b/core/src/main/java/kafka/server/NetworkUtils.java index 87dc7e961074e..5607f2623f9c9 100644 --- a/core/src/main/java/kafka/server/NetworkUtils.java +++ b/core/src/main/java/kafka/server/NetworkUtils.java @@ -82,7 +82,7 @@ public static NetworkClient buildNetworkClient(String prefix, config.connectionSetupTimeoutMs(), config.connectionSetupTimeoutMaxMs(), time, - false, + true, new ApiVersions(), logContext ); diff --git a/tests/kafkatest/tests/core/transactions_mixed_versions_test.py b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py new file mode 100644 index 0000000000000..20946ce776cff --- /dev/null +++ b/tests/kafkatest/tests/core/transactions_mixed_versions_test.py @@ -0,0 +1,222 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.kafka.quorum import isolated_kraft +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.transactional_message_copier import TransactionalMessageCopier +from kafkatest.utils import is_int +from kafkatest.utils.transactions_utils import create_and_start_copiers +from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \ + DEV_BRANCH, KafkaVersion + +from ducktape.tests.test import Test +from ducktape.mark import matrix +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until + +import time + +class TransactionsMixedVersionsTest(Test): + """Tests transactions by transactionally copying data from a source topic to + a destination between brokers with different image versions. This is how transactions work + while the cluster is undergoing an upgrade. + """ + def __init__(self, test_context): + """:type test_context: ducktape.tests.test.TestContext""" + super(TransactionsMixedVersionsTest, self).__init__(test_context=test_context) + + self.input_topic = "input-topic" + self.output_topic = "output-topic" + + self.num_brokers = 3 + + self.replication_factor = 3 + + # Test parameters + self.num_input_partitions = 1 + self.num_output_partitions = 1 + self.num_seed_messages = 1000 + self.transaction_size = 5 + + # The transaction timeout should be lower than the progress timeout, but at + # least as high as the request timeout (which is 30s by default). + self.transaction_timeout = 40000 + self.progress_timeout_sec = 60 + self.consumer_group = "transactions-test-consumer-group" + + def seed_messages(self, topic, num_seed_messages): + seed_timeout_sec = 10000 + seed_producer = VerifiableProducer(context=self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=topic, + message_validator=is_int, + max_messages=num_seed_messages, + enable_idempotence=True) + seed_producer.start() + wait_until(lambda: seed_producer.num_acked >= num_seed_messages, + timeout_sec=seed_timeout_sec, + err_msg="Producer failed to produce messages %d in %ds." %\ + (self.num_seed_messages, seed_timeout_sec)) + return seed_producer.acked + + def get_messages_from_topic(self, topic, num_messages): + consumer = self.start_consumer(topic, group_id="verifying_consumer") + return self.drain_consumer(consumer, num_messages) + + def start_consumer(self, topic_to_read, group_id): + consumer = ConsoleConsumer(context=self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=topic_to_read, + group_id=group_id, + message_validator=is_int, + from_beginning=True, + isolation_level="read_committed") + consumer.start() + # ensure that the consumer is up. + wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True, + timeout_sec=60, + err_msg="Consumer failed to consume any messages for %ds" %\ + 60) + return consumer + + def drain_consumer(self, consumer, num_messages): + # wait until we read at least the expected number of messages. + # This is a safe check because both failure modes will be caught: + # 1. If we have 'num_seed_messages' but there are duplicates, then + # this is checked for later. + # + # 2. If we never reach 'num_seed_messages', then this will cause the + # test to fail. + wait_until(lambda: len(consumer.messages_consumed[1]) >= num_messages, + timeout_sec=90, + err_msg="Consumer consumed only %d out of %d messages in %ds" %\ + (len(consumer.messages_consumed[1]), num_messages, 90)) + consumer.stop() + return consumer.messages_consumed[1] + + def copy_messages_transactionally(self, input_topic, output_topic, + num_copiers, num_messages_to_copy, + use_group_metadata): + """Copies messages transactionally from the seeded input topic to the + output topic. + + This method also consumes messages in read_committed mode from the + output topic. + + It returns the concurrently consumed messages. + """ + + copiers = create_and_start_copiers(test_context=self.test_context, + kafka=self.kafka, + consumer_group=self.consumer_group, + input_topic=input_topic, + output_topic=output_topic, + transaction_size=self.transaction_size, + transaction_timeout=self.transaction_timeout, + num_copiers=num_copiers, + use_group_metadata=use_group_metadata) + concurrent_consumer = self.start_consumer(output_topic, + group_id="concurrent_consumer") + + copier_timeout_sec = 120 + for copier in copiers: + wait_until(lambda: copier.is_done, + timeout_sec=copier_timeout_sec, + err_msg="%s - Failed to copy all messages in %ds." %\ + (copier.transactional_id, copier_timeout_sec)) + self.logger.info("finished copying messages") + + return self.drain_consumer(concurrent_consumer, num_messages_to_copy) + + def setup_topics(self): + assignment = ":".join(map(str, [self.kafka.idx(node) for node in self.kafka.nodes])) + transaction_assignment = ",".join(map(str, [assignment[::-1]] * 50)) + self.kafka.topics = { + self.input_topic: { + "partitions": self.num_input_partitions, + "replication-factor": self.replication_factor, + "replica-assignment": assignment, + "configs": { + "min.insync.replicas": 2 + } + }, + self.output_topic: { + "partitions": self.num_output_partitions, + "replication-factor": self.replication_factor, + "replica-assignment": assignment, + "configs": { + "min.insync.replicas": 2 + } + }, + "__transaction_state": { + "partitions": 50, + "replication-factor": self.replication_factor, + "replica-assignment": transaction_assignment, + "configs": { + "min.insync.replicas": 2 + } + } + } + + @cluster(num_nodes=8) + @matrix( + old_kafka_version=[str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)], + metadata_quorum=[isolated_kraft] + ) + def test_transactions_mixed_versions(self, old_kafka_version, metadata_quorum=quorum.isolated_kraft): + oldKafkaVersion = KafkaVersion(old_kafka_version) + self.kafka = KafkaService(self.test_context, + num_nodes=self.num_brokers, + zk=None, + version=oldKafkaVersion, + controller_num_nodes_override=1) + + self.kafka.nodes[0].version = DEV_BRANCH + + security_protocol = 'PLAINTEXT' + self.kafka.security_protocol = security_protocol + self.kafka.interbroker_security_protocol = security_protocol + self.kafka.logs["kafka_data_1"]["collect_default"] = True + self.kafka.logs["kafka_data_2"]["collect_default"] = True + self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = True + + self.setup_topics() + self.kafka.start() + + input_messages = self.seed_messages(self.input_topic, self.num_seed_messages) + concurrently_consumed_messages = self.copy_messages_transactionally( + input_topic=self.input_topic, output_topic=self.output_topic, num_copiers=self.num_input_partitions, + num_messages_to_copy=self.num_seed_messages, use_group_metadata=True) + output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages) + + concurrently_consumed_message_set = set(concurrently_consumed_messages) + output_message_set = set(output_messages) + input_message_set = set(input_messages) + + num_dups = abs(len(output_messages) - len(output_message_set)) + num_dups_in_concurrent_consumer = abs(len(concurrently_consumed_messages) + - len(concurrently_consumed_message_set)) + assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups + assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" %\ + (len(input_message_set), len(output_message_set)) + + assert num_dups_in_concurrent_consumer == 0, "Detected %d dups in concurrently consumed messages" % num_dups_in_concurrent_consumer + assert input_message_set == concurrently_consumed_message_set, \ + "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" %\ + (len(input_message_set), len(concurrently_consumed_message_set)) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 51ee971ef8fb5..ac4a68033897d 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -17,8 +17,8 @@ from kafkatest.services.kafka import KafkaService, quorum from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.transactional_message_copier import TransactionalMessageCopier from kafkatest.utils import is_int +from kafkatest.utils.transactions_utils import create_and_start_copiers from ducktape.tests.test import Test from ducktape.mark import matrix @@ -110,27 +110,6 @@ def bounce_brokers(self, clean_shutdown): self.kafka.await_no_under_replicated_partitions() - def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id, use_group_metadata): - message_copier = TransactionalMessageCopier( - context=self.test_context, - num_nodes=1, - kafka=self.kafka, - transactional_id=transactional_id, - consumer_group=self.consumer_group, - input_topic=input_topic, - input_partition=input_partition, - output_topic=output_topic, - max_messages=-1, - transaction_size=self.transaction_size, - transaction_timeout=self.transaction_timeout, - use_group_metadata=use_group_metadata - ) - message_copier.start() - wait_until(lambda: message_copier.alive(message_copier.nodes[0]), - timeout_sec=10, - err_msg="Message copier failed to start after 10 s") - return message_copier - def bounce_copiers(self, copiers, clean_shutdown): for _ in range(3): for copier in copiers: @@ -142,18 +121,6 @@ def bounce_copiers(self, copiers, clean_shutdown): str(copier.progress_percent()))) copier.restart(clean_shutdown) - def create_and_start_copiers(self, input_topic, output_topic, num_copiers, use_group_metadata): - copiers = [] - for i in range(0, num_copiers): - copiers.append(self.create_and_start_message_copier( - input_topic=input_topic, - output_topic=output_topic, - input_partition=i, - transactional_id="copier-" + str(i), - use_group_metadata=use_group_metadata - )) - return copiers - def start_consumer(self, topic_to_read, group_id): consumer = ConsoleConsumer(context=self.test_context, num_nodes=1, @@ -199,10 +166,15 @@ def copy_messages_transactionally(self, failure_mode, bounce_target, It returns the concurrently consumed messages. """ - copiers = self.create_and_start_copiers(input_topic=input_topic, - output_topic=output_topic, - num_copiers=num_copiers, - use_group_metadata=use_group_metadata) + copiers = create_and_start_copiers(test_context=self.test_context, + kafka=self.kafka, + consumer_group=self.consumer_group, + input_topic=input_topic, + output_topic=output_topic, + transaction_size=self.transaction_size, + transaction_timeout=self.transaction_timeout, + num_copiers=num_copiers, + use_group_metadata=use_group_metadata) concurrent_consumer = self.start_consumer(output_topic, group_id="concurrent_consumer") clean_shutdown = False diff --git a/tests/kafkatest/tests/core/transactions_upgrade_test.py b/tests/kafkatest/tests/core/transactions_upgrade_test.py new file mode 100644 index 0000000000000..8bf5a00b96c3c --- /dev/null +++ b/tests/kafkatest/tests/core/transactions_upgrade_test.py @@ -0,0 +1,240 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.kafka import KafkaService, quorum +from kafkatest.services.kafka.quorum import isolated_kraft +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.transactional_message_copier import TransactionalMessageCopier +from kafkatest.utils import is_int +from kafkatest.utils.transactions_utils import create_and_start_copiers +from kafkatest.version import LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, \ + DEV_BRANCH, KafkaVersion, LATEST_METADATA_VERSION + +from ducktape.tests.test import Test +from ducktape.mark import matrix +from ducktape.mark.resource import cluster +from ducktape.utils.util import wait_until + +import time + +class TransactionsUpgradeTest(Test): + """Tests transactions by transactionally copying data from a source topic to + a destination topic while upgrading the cluster. In the end we verify that the final output + topic contains exactly one committed copy of each message in the input + topic. + """ + def __init__(self, test_context): + """:type test_context: ducktape.tests.test.TestContext""" + super(TransactionsUpgradeTest, self).__init__(test_context=test_context) + + self.input_topic = "input-topic" + self.output_topic = "output-topic" + + self.num_brokers = 3 + + self.replication_factor = 3 + + # Test parameters + self.num_input_partitions = 3 + self.num_output_partitions = 3 + self.num_seed_messages = 1000 + self.transaction_size = 5 + + # The transaction timeout should be lower than the progress timeout, but at + # least as high as the request timeout (which is 30s by default). + self.transaction_timeout = 40000 + self.progress_timeout_sec = 60 + self.consumer_group = "transactions-test-consumer-group" + + def seed_messages(self, topic, num_seed_messages): + seed_timeout_sec = 10000 + seed_producer = VerifiableProducer(context=self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=topic, + message_validator=is_int, + max_messages=num_seed_messages, + enable_idempotence=True) + seed_producer.start() + wait_until(lambda: seed_producer.num_acked >= num_seed_messages, + timeout_sec=seed_timeout_sec, + err_msg="Producer failed to produce messages %d in %ds." %\ + (self.num_seed_messages, seed_timeout_sec)) + return seed_producer.acked + + def get_messages_from_topic(self, topic, num_messages): + consumer = self.start_consumer(topic, group_id="verifying_consumer") + return self.drain_consumer(consumer, num_messages) + + def start_consumer(self, topic_to_read, group_id): + consumer = ConsoleConsumer(context=self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=topic_to_read, + group_id=group_id, + message_validator=is_int, + from_beginning=True, + isolation_level="read_committed") + consumer.start() + # ensure that the consumer is up. + wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True, + timeout_sec=60, + err_msg="Consumer failed to consume any messages for %ds" %\ + 60) + return consumer + + def drain_consumer(self, consumer, num_messages): + # wait until we read at least the expected number of messages. + # This is a safe check because both failure modes will be caught: + # 1. If we have 'num_seed_messages' but there are duplicates, then + # this is checked for later. + # + # 2. If we never reach 'num_seed_messages', then this will cause the + # test to fail. + wait_until(lambda: len(consumer.messages_consumed[1]) >= num_messages, + timeout_sec=90, + err_msg="Consumer consumed only %d out of %d messages in %ds" %\ + (len(consumer.messages_consumed[1]), num_messages, 90)) + consumer.stop() + return consumer.messages_consumed[1] + + def wait_until_rejoin(self): + for partition in range(0, self.num_input_partitions): + wait_until(lambda: len(self.kafka.isr_idx_list(self.input_topic, partition)) == self.replication_factor, timeout_sec=60, + backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") + + for partition in range(0, self.num_output_partitions): + wait_until(lambda: len(self.kafka.isr_idx_list(self.output_topic, partition)) == self.replication_factor, timeout_sec=60, + backoff_sec=1, err_msg="Replicas did not rejoin the ISR in a reasonable amount of time") + + def perform_upgrade(self, from_kafka_version): + self.logger.info("Performing rolling upgrade.") + for node in self.kafka.controller_quorum.nodes: + self.logger.info("Stopping controller node %s" % node.account.hostname) + self.kafka.controller_quorum.stop_node(node) + node.version = DEV_BRANCH + self.logger.info("Restarting controller node %s" % node.account.hostname) + self.kafka.controller_quorum.start_node(node) + self.wait_until_rejoin() + self.logger.info("Successfully restarted controller node %s" % node.account.hostname) + for node in self.kafka.nodes: + self.logger.info("Stopping broker node %s" % node.account.hostname) + self.kafka.stop_node(node) + node.version = DEV_BRANCH + self.logger.info("Restarting broker node %s" % node.account.hostname) + self.kafka.start_node(node) + self.wait_until_rejoin() + self.logger.info("Successfully restarted broker node %s" % node.account.hostname) + self.logger.info("Changing metadata.version to %s" % LATEST_METADATA_VERSION) + self.kafka.upgrade_metadata_version(LATEST_METADATA_VERSION) + + def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic, + num_copiers, num_messages_to_copy, + use_group_metadata, from_kafka_version): + """Copies messages transactionally from the seeded input topic to the + output topic while an rolling upgrade occurs. + + This method also consumes messages in read_committed mode from the + output topic. + + It returns the concurrently consumed messages. + """ + self.perform_upgrade(from_kafka_version) + + copiers = create_and_start_copiers(test_context=self.test_context, + kafka=self.kafka, + consumer_group=self.consumer_group, + input_topic=input_topic, + output_topic=output_topic, + transaction_size=self.transaction_size, + transaction_timeout=self.transaction_timeout, + num_copiers=num_copiers, + use_group_metadata=use_group_metadata) + concurrent_consumer = self.start_consumer(output_topic, + group_id="concurrent_consumer") + + copier_timeout_sec = 120 + for copier in copiers: + wait_until(lambda: copier.is_done, + timeout_sec=copier_timeout_sec, + err_msg="%s - Failed to copy all messages in %ds." %\ + (copier.transactional_id, copier_timeout_sec)) + self.logger.info("finished copying messages") + + return self.drain_consumer(concurrent_consumer, num_messages_to_copy) + + def setup_topics(self): + self.kafka.topics = { + self.input_topic: { + "partitions": self.num_input_partitions, + "replication-factor": self.replication_factor, + "configs": { + "min.insync.replicas": 2 + } + }, + self.output_topic: { + "partitions": self.num_output_partitions, + "replication-factor": self.replication_factor, + "configs": { + "min.insync.replicas": 2 + } + } + } + + @cluster(num_nodes=10) + @matrix( + from_kafka_version=[str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)], + metadata_quorum=[isolated_kraft] + ) + def test_transactions_upgrade(self, from_kafka_version, metadata_quorum=quorum.isolated_kraft): + fromKafkaVersion = KafkaVersion(from_kafka_version) + self.kafka = KafkaService(self.test_context, + num_nodes=self.num_brokers, + zk=None, + version=fromKafkaVersion, + controller_num_nodes_override=1) + + security_protocol = 'PLAINTEXT' + self.kafka.security_protocol = security_protocol + self.kafka.interbroker_security_protocol = security_protocol + self.kafka.logs["kafka_data_1"]["collect_default"] = True + self.kafka.logs["kafka_data_2"]["collect_default"] = True + self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = True + + self.setup_topics() + self.kafka.start() + + input_messages = self.seed_messages(self.input_topic, self.num_seed_messages) + concurrently_consumed_messages = self.copy_messages_transactionally_during_upgrade( + input_topic=self.input_topic, output_topic=self.output_topic, num_copiers=self.num_input_partitions, + num_messages_to_copy=self.num_seed_messages, use_group_metadata=True, from_kafka_version=from_kafka_version) + output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages) + + concurrently_consumed_message_set = set(concurrently_consumed_messages) + output_message_set = set(output_messages) + input_message_set = set(input_messages) + + num_dups = abs(len(output_messages) - len(output_message_set)) + num_dups_in_concurrent_consumer = abs(len(concurrently_consumed_messages) + - len(concurrently_consumed_message_set)) + assert num_dups == 0, "Detected %d duplicates in the output stream" % num_dups + assert input_message_set == output_message_set, "Input and output message sets are not equal. Num input messages %d. Num output messages %d" %\ + (len(input_message_set), len(output_message_set)) + + assert num_dups_in_concurrent_consumer == 0, "Detected %d dups in concurrently consumed messages" % num_dups_in_concurrent_consumer + assert input_message_set == concurrently_consumed_message_set, \ + "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" %\ + (len(input_message_set), len(concurrently_consumed_message_set)) diff --git a/tests/kafkatest/utils/transactions_utils.py b/tests/kafkatest/utils/transactions_utils.py new file mode 100644 index 0000000000000..f84c49e2b0a5c --- /dev/null +++ b/tests/kafkatest/utils/transactions_utils.py @@ -0,0 +1,55 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.services.transactional_message_copier import TransactionalMessageCopier + +from ducktape.utils.util import wait_until + +def create_and_start_message_copier(test_context, kafka, consumer_group, input_topic, input_partition, + output_topic, transaction_size, transaction_timeout, transactional_id, use_group_metadata): + message_copier = TransactionalMessageCopier( + context=test_context, + num_nodes=1, + kafka=kafka, + transactional_id=transactional_id, + consumer_group=consumer_group, + input_topic=input_topic, + input_partition=input_partition, + output_topic=output_topic, + max_messages=-1, + transaction_size=transaction_size, + transaction_timeout=transaction_timeout, + use_group_metadata=use_group_metadata + ) + message_copier.start() + wait_until(lambda: message_copier.alive(message_copier.nodes[0]), + timeout_sec=10, + err_msg="Message copier failed to start after 10 s") + return message_copier + +def create_and_start_copiers(test_context, kafka, consumer_group, input_topic, output_topic, transaction_size, + transaction_timeout, num_copiers, use_group_metadata): + copiers = [] + for i in range(0, num_copiers): + copiers.append(create_and_start_message_copier( + test_context=test_context, + kafka=kafka, + consumer_group=consumer_group, + input_topic=input_topic, + output_topic=output_topic, + input_partition=i, + transaction_size=transaction_size, + transaction_timeout=transaction_timeout, + transactional_id="copier-" + str(i), + use_group_metadata=use_group_metadata + )) + return copiers