From 3a4e749a9269a638e6c832a7e796ef4a63080894 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 9 Mar 2023 20:41:29 +0100 Subject: [PATCH] [transactions] Implement KIP-664 listTransactions (#76) (cherry picked from commit 5ef4a8531cde389af898cf36f57c518b340e426c) --- .../handlers/kop/KafkaCommandDecoder.java | 6 + .../handlers/kop/KafkaRequestHandler.java | 26 +++- .../coordinator/group/GroupCoordinator.java | 15 +-- .../transaction/TransactionCoordinator.java | 18 ++- .../transaction/TransactionState.java | 22 ++++ .../transaction/TransactionStateManager.java | 67 ++++++++++ .../kop/utils/KafkaResponseUtils.java | 19 +-- .../group/GroupCoordinatorTest.java | 22 ++-- .../transaction/TransactionTest.java | 114 ++++++++++++++++++ 9 files changed, 279 insertions(+), 30 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java index ad7b3f9d61..6e61ceb50b 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java @@ -314,6 +314,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception case LIST_GROUPS: handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; + case LIST_TRANSACTIONS: + handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture); + break; case DELETE_GROUPS: handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture); break; @@ -572,6 +575,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest, protected abstract void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void + handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture response); + protected abstract void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture response); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 466da180d0..cf69d9b4f1 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -31,6 +31,7 @@ import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator; import io.streamnative.pulsar.handlers.kop.security.Session; import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer; @@ -125,6 +126,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData; import org.apache.kafka.common.message.ListOffsetsRequestData; import org.apache.kafka.common.message.ListOffsetsResponseData; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.ProduceRequestData; import org.apache.kafka.common.message.SaslAuthenticateResponseData; @@ -173,6 +175,8 @@ import org.apache.kafka.common.requests.ListOffsetRequestV0; import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.ListTransactionsRequest; +import org.apache.kafka.common.requests.ListTransactionsResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata; import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata; @@ -2038,8 +2042,26 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup, protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture resultFuture) { checkArgument(listGroups.getRequest() instanceof ListGroupsRequest); - KeyValue> listResult = getGroupCoordinator().handleListGroups(); - resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue())); + Either> listResult = getGroupCoordinator().handleListGroups(); + resultFuture.complete(KafkaResponseUtils.newListGroups(listResult)); + } + + @Override + protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions, + CompletableFuture resultFuture) { + checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest); + ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest(); + List stateFilters = request.data().stateFilters(); + if (stateFilters == null) { + stateFilters = Collections.emptyList(); + } + List producerIdFilters = request.data().producerIdFilters(); + if (producerIdFilters == null) { + producerIdFilters = Collections.emptyList(); + } + ListTransactionsResponseData listResult = getTransactionCoordinator() + .handleListTransactions(stateFilters, producerIdFilters); + resultFuture.complete(new ListTransactionsResponse(listResult)); } @Override diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java index f1f5d90ac1..98813604f7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinator.java @@ -29,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.CoreUtils; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey; @@ -832,22 +833,16 @@ public KeyValue> handleFetchOffsets( ); } - public KeyValue> handleListGroups() { + public Either> handleListGroups() { if (!isActive.get()) { - return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>()); + return Either.left(Errors.COORDINATOR_NOT_AVAILABLE); } else { - Errors errors; if (groupManager.isLoading()) { - errors = Errors.COORDINATOR_LOAD_IN_PROGRESS; - } else { - errors = Errors.NONE; + return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS); } List overviews = new ArrayList<>(); groupManager.currentGroups().forEach(group -> overviews.add(group.overview())); - return new KeyValue<>( - errors, - overviews - ); + return Either.right(overviews); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java index 498cbb0e2d..f438c6790f 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java @@ -32,6 +32,7 @@ import io.streamnative.pulsar.handlers.kop.storage.PulsarTopicProducerStateManagerSnapshotBuffer; import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils; import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -51,6 +52,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.TransactionResult; @@ -79,6 +81,8 @@ public class TransactionCoordinator { private final Time time; + private final AtomicBoolean isActive = new AtomicBoolean(false); + private static final BiConsumer onEndTransactionComplete = (txnIdAndPidEpoch, errors) -> { @@ -215,6 +219,17 @@ public static String getTopicPartitionName(String topicPartitionName, int partit return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId; } + public ListTransactionsResponseData handleListTransactions(List filteredStates, + List filteredProducerIds) { + // https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/ + // src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259 + if (!isActive.get()) { + log.warn("The transaction coordinator is not active, so it will reject list transaction request"); + return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code()); + } + return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates); + } + @Data @EqualsAndHashCode @AllArgsConstructor @@ -925,7 +940,8 @@ public CompletableFuture startup(boolean enableTransactionalIdExpiration) txnManager.startup(enableTransactionalIdExpiration); return this.producerIdManager.initialize().thenCompose(ignored -> { - log.info("Startup transaction coordinator complete."); + log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata); + isActive.set(true); return CompletableFuture.completedFuture(null); }); } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java index 7919449097..01d626f391 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionState.java @@ -117,4 +117,26 @@ public boolean isExpirationAllowed() { return false; } } + + public org.apache.kafka.clients.admin.TransactionState toAdminState() { + switch (this) { + case EMPTY: + return org.apache.kafka.clients.admin.TransactionState.EMPTY; + case ONGOING: + return org.apache.kafka.clients.admin.TransactionState.ONGOING; + case PREPARE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT; + case PREPARE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT; + case COMPLETE_COMMIT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT; + case COMPLETE_ABORT: + return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT; + case PREPARE_EPOCH_FENCE: + return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE; + case DEAD: + default: + return org.apache.kafka.clients.admin.TransactionState.UNKNOWN; + } + } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java index 2855f88429..9c62affed7 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManager.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -40,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ListTransactionsResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.ProduceResponse; @@ -244,6 +246,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs <= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs()); } + private static boolean shouldInclude(TransactionMetadata txnMetadata, + List filterProducerIds, Set filterStateNames) { + if (txnMetadata.getState() == TransactionState.DEAD) { + // We filter the `Dead` state since it is a transient state which + // indicates that the transactionalId and its metadata are in the + // process of expiration and removal. + return false; + } else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) { + return false; + } else if (!filterStateNames.isEmpty() && !filterStateNames.contains( + txnMetadata.getState().toAdminState().toString())) { + return false; + } else { + return true; + } + } + + public ListTransactionsResponseData listTransactionStates(List filteredProducerIds, + List filteredStates) { + return CoreUtils.inReadLock(stateLock, () -> { + ListTransactionsResponseData response = new ListTransactionsResponseData(); + if (!loadingPartitions.isEmpty()) { + response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); + } else { + Set filterStates = new HashSet<>(); + for (TransactionState stateName : TransactionState.values()) { + String nameForTheClient = stateName.toAdminState().toString(); + if (filteredStates.contains(nameForTheClient)) { + filterStates.add(nameForTheClient); + } else { + response.unknownStateFilters().add(nameForTheClient); + } + } + List states = new ArrayList<>(); + transactionMetadataCache.forEach((__, cache) -> { + cache.values().forEach(txnMetadata -> { + txnMetadata.inLock(() -> { + // use toString() to get the name of the state according to the protocol + ListTransactionsResponseData.TransactionState transactionState = + new ListTransactionsResponseData.TransactionState() + .setTransactionalId(txnMetadata.getTransactionalId()) + .setProducerId(txnMetadata.getProducerId()) + .setTransactionState(txnMetadata.getState().toAdminState().toString()); + + if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) { + if (log.isDebugEnabled()) { + log.debug("add transaction state: {}", transactionState); + } + states.add(transactionState); + } else { + if (log.isDebugEnabled()) { + log.debug("Skip transaction state: {}", transactionState); + } + } + return null; + }); + }); + }); + response.setErrorCode(Errors.NONE.code()) + .setTransactionStates(states); + } + return response; + }); + } + @Data @AllArgsConstructor private static class TransactionalIdCoordinatorEpochAndMetadata { diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java index f20fc5251c..d779862b87 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/KafkaResponseUtils.java @@ -15,6 +15,7 @@ import io.streamnative.pulsar.handlers.kop.ApiVersion; import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -278,14 +279,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) { return new LeaveGroupResponse(data); } - public static ListGroupsResponse newListGroups(Errors errors, - List groups) { + public static ListGroupsResponse newListGroups(Either> results) { ListGroupsResponseData data = new ListGroupsResponseData(); - data.setErrorCode(errors.code()); - data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup() - .setGroupId(overView.groupId()) - .setProtocolType(overView.protocolType())) - .collect(Collectors.toList())); + data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code()); + if (!results.isLeft()) { + data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup() + .setGroupId(overView.groupId()) + .setProtocolType(overView.protocolType())) + .collect(Collectors.toList())); + + } else { + data.setGroups(Collections.emptyList()); + } return new ListGroupsResponse(data); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java index 105958975d..3696b926dc 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/group/GroupCoordinatorTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -28,6 +29,7 @@ import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary; import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary; import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata; +import io.streamnative.pulsar.handlers.kop.scala.Either; import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory; import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer; import java.util.ArrayList; @@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception { assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey()); // ListGroups - KeyValue> listGroupsResult = groupCoordinator.handleListGroups(); - assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey()); + Either> listGroupsResult = groupCoordinator.handleListGroups(); + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft()); // DeleteGroups Map deleteGroupsErrors = groupCoordinator.handleDeleteGroups( @@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols() ).get(); assertEquals(Errors.NONE, syncGroupResult.getKey()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } @@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols() ); assertEquals(Errors.NONE, joinGroupResult.getError()); - KeyValue> groups = groupCoordinator.handleListGroups(); - assertEquals(Errors.NONE, groups.getKey()); - assertEquals(1, groups.getValue().size()); + Either> groups = groupCoordinator.handleListGroups(); + assertFalse(groups.isLeft()); + assertEquals(1, groups.getRight().size()); assertEquals( new GroupOverview("groupId", "consumer"), - groups.getValue().get(0) + groups.getRight().get(0) ); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java index 4432e886b5..df0f599750 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java @@ -31,6 +31,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -46,7 +47,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTransactionsOptions; +import org.apache.kafka.clients.admin.ListTransactionsResult; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TransactionListing; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -1320,6 +1324,116 @@ public void testNotFencedWithBeginTransaction() throws Exception { producer2.close(); } + @Test(timeOut = 100000 * 30) + public void testListTransactions() throws Exception { + + String topicName = "testListTransactions"; + String transactionalId = "myProducer_" + UUID.randomUUID(); + + @Cleanup + KafkaProducer producer = buildTransactionProducer(transactionalId); + @Cleanup + AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties()); + + producer.initTransactions(); + producer.beginTransaction(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.EMPTY); + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + listTransactionsResult.all().get().forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.ONGOING); + }); + producer.commitTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + }); + producer.beginTransaction(); + + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT); + + producer.send(new ProducerRecord<>(topicName, 1, "bar")).get(); + producer.flush(); + producer.abortTransaction(); + Awaitility.await().untilAsserted(() -> { + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + }); + producer.close(); + assertTransactionState(kafkaAdmin, transactionalId, + org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT); + } + + private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId, + org.apache.kafka.clients.admin.TransactionState transactionState) + throws Exception { + ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions(); + Collection transactionListings = listTransactionsResult.all().get(); + transactionListings.forEach(t -> { + log.info("Found transactionalId: {} {} {}", + t.transactionalId(), + t.producerId(), + t.state()); + }); + TransactionListing transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same state + ListTransactionsOptions optionFilterState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + + // filter for the same producer id + ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions() + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + + // filter for the same producer id and state + ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions() + .filterStates(Collections.singleton(transactionState)) + .filterProducerIds(Collections.singleton(transactionListing.producerId())); + listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState); + transactionListings = listTransactionsResult.all().get(); + transactionListing = transactionListings + .stream() + .filter(t -> t.transactionalId().equals(transactionalId)) + .findFirst() + .get(); + assertEquals(transactionState, transactionListing.state()); + } + /** * Get the Kafka server address. */