Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[transactions] Implement KIP-664 listTransactions (#76)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5ef4a85)
  • Loading branch information
eolivelli authored and gaoran10 committed Jul 30, 2023
1 parent 930e502 commit 3a4e749
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
case LIST_GROUPS:
handleListGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case LIST_TRANSACTIONS:
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
break;
case DELETE_GROUPS:
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
break;
Expand Down Expand Up @@ -572,6 +575,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
protected abstract void
handleListGroupsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);

protected abstract void
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.streamnative.pulsar.handlers.kop.exceptions.KoPTopicException;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.offset.OffsetMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
import io.streamnative.pulsar.handlers.kop.security.Session;
import io.streamnative.pulsar.handlers.kop.security.auth.Authorizer;
Expand Down Expand Up @@ -125,6 +126,7 @@
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
Expand Down Expand Up @@ -173,6 +175,8 @@
import org.apache.kafka.common.requests.ListOffsetRequestV0;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListTransactionsRequest;
import org.apache.kafka.common.requests.ListTransactionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
Expand Down Expand Up @@ -2038,8 +2042,26 @@ protected void handleDescribeGroupRequest(KafkaHeaderAndRequest describeGroup,
protected void handleListGroupsRequest(KafkaHeaderAndRequest listGroups,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listGroups.getRequest() instanceof ListGroupsRequest);
KeyValue<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult.getKey(), listResult.getValue()));
Either<Errors, List<GroupOverview>> listResult = getGroupCoordinator().handleListGroups();
resultFuture.complete(KafkaResponseUtils.newListGroups(listResult));
}

@Override
protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransactions,
CompletableFuture<AbstractResponse> resultFuture) {
checkArgument(listTransactions.getRequest() instanceof ListTransactionsRequest);
ListTransactionsRequest request = (ListTransactionsRequest) listTransactions.getRequest();
List<String> stateFilters = request.data().stateFilters();
if (stateFilters == null) {
stateFilters = Collections.emptyList();
}
List<Long> producerIdFilters = request.data().producerIdFilters();
if (producerIdFilters == null) {
producerIdFilters = Collections.emptyList();
}
ListTransactionsResponseData listResult = getTransactionCoordinator()
.handleListTransactions(stateFilters, producerIdFilters);
resultFuture.complete(new ListTransactionsResponse(listResult));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupOverview;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.GroupKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey.MemberKey;
Expand Down Expand Up @@ -832,22 +833,16 @@ public KeyValue<Errors, Map<TopicPartition, PartitionData>> handleFetchOffsets(
);
}

public KeyValue<Errors, List<GroupOverview>> handleListGroups() {
public Either<Errors, List<GroupOverview>> handleListGroups() {
if (!isActive.get()) {
return new KeyValue<>(Errors.COORDINATOR_NOT_AVAILABLE, new ArrayList<>());
return Either.left(Errors.COORDINATOR_NOT_AVAILABLE);
} else {
Errors errors;
if (groupManager.isLoading()) {
errors = Errors.COORDINATOR_LOAD_IN_PROGRESS;
} else {
errors = Errors.NONE;
return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS);
}
List<GroupOverview> overviews = new ArrayList<>();
groupManager.currentGroups().forEach(group -> overviews.add(group.overview()));
return new KeyValue<>(
errors,
overviews
);
return Either.right(overviews);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.streamnative.pulsar.handlers.kop.storage.PulsarTopicProducerStateManagerSnapshotBuffer;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -51,6 +52,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.TransactionResult;
Expand Down Expand Up @@ -79,6 +81,8 @@ public class TransactionCoordinator {

private final Time time;

private final AtomicBoolean isActive = new AtomicBoolean(false);

private static final BiConsumer<TransactionStateManager.TransactionalIdAndProducerIdEpoch, Errors>
onEndTransactionComplete =
(txnIdAndPidEpoch, errors) -> {
Expand Down Expand Up @@ -215,6 +219,17 @@ public static String getTopicPartitionName(String topicPartitionName, int partit
return topicPartitionName + PARTITIONED_TOPIC_SUFFIX + partitionId;
}

public ListTransactionsResponseData handleListTransactions(List<String> filteredStates,
List<Long> filteredProducerIds) {
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L259
if (!isActive.get()) {
log.warn("The transaction coordinator is not active, so it will reject list transaction request");
return new ListTransactionsResponseData().setErrorCode(Errors.NOT_COORDINATOR.code());
}
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
}

@Data
@EqualsAndHashCode
@AllArgsConstructor
Expand Down Expand Up @@ -925,7 +940,8 @@ public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration)
txnManager.startup(enableTransactionalIdExpiration);

return this.producerIdManager.initialize().thenCompose(ignored -> {
log.info("Startup transaction coordinator complete.");
log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata);
isActive.set(true);
return CompletableFuture.completedFuture(null);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,26 @@ public boolean isExpirationAllowed() {
return false;
}
}

public org.apache.kafka.clients.admin.TransactionState toAdminState() {
switch (this) {
case EMPTY:
return org.apache.kafka.clients.admin.TransactionState.EMPTY;
case ONGOING:
return org.apache.kafka.clients.admin.TransactionState.ONGOING;
case PREPARE_COMMIT:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_COMMIT;
case PREPARE_ABORT:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_ABORT;
case COMPLETE_COMMIT:
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT;
case COMPLETE_ABORT:
return org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT;
case PREPARE_EPOCH_FENCE:
return org.apache.kafka.clients.admin.TransactionState.PREPARE_EPOCH_FENCE;
case DEAD:
default:
return org.apache.kafka.clients.admin.TransactionState.UNKNOWN;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -40,6 +41,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ListTransactionsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.ProduceResponse;
Expand Down Expand Up @@ -244,6 +246,71 @@ private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs
<= (currentTimeMs - transactionConfig.getTransactionalIdExpirationMs());
}

private static boolean shouldInclude(TransactionMetadata txnMetadata,
List<Long> filterProducerIds, Set<String> filterStateNames) {
if (txnMetadata.getState() == TransactionState.DEAD) {
// We filter the `Dead` state since it is a transient state which
// indicates that the transactionalId and its metadata are in the
// process of expiration and removal.
return false;
} else if (!filterProducerIds.isEmpty() && !filterProducerIds.contains(txnMetadata.getProducerId())) {
return false;
} else if (!filterStateNames.isEmpty() && !filterStateNames.contains(
txnMetadata.getState().toAdminState().toString())) {
return false;
} else {
return true;
}
}

public ListTransactionsResponseData listTransactionStates(List<Long> filteredProducerIds,
List<String> filteredStates) {
return CoreUtils.inReadLock(stateLock, () -> {
ListTransactionsResponseData response = new ListTransactionsResponseData();
if (!loadingPartitions.isEmpty()) {
response.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
} else {
Set<String> filterStates = new HashSet<>();
for (TransactionState stateName : TransactionState.values()) {
String nameForTheClient = stateName.toAdminState().toString();
if (filteredStates.contains(nameForTheClient)) {
filterStates.add(nameForTheClient);
} else {
response.unknownStateFilters().add(nameForTheClient);
}
}
List<ListTransactionsResponseData.TransactionState> states = new ArrayList<>();
transactionMetadataCache.forEach((__, cache) -> {
cache.values().forEach(txnMetadata -> {
txnMetadata.inLock(() -> {
// use toString() to get the name of the state according to the protocol
ListTransactionsResponseData.TransactionState transactionState =
new ListTransactionsResponseData.TransactionState()
.setTransactionalId(txnMetadata.getTransactionalId())
.setProducerId(txnMetadata.getProducerId())
.setTransactionState(txnMetadata.getState().toAdminState().toString());

if (shouldInclude(txnMetadata, filteredProducerIds, filterStates)) {
if (log.isDebugEnabled()) {
log.debug("add transaction state: {}", transactionState);
}
states.add(transactionState);
} else {
if (log.isDebugEnabled()) {
log.debug("Skip transaction state: {}", transactionState);
}
}
return null;
});
});
});
response.setErrorCode(Errors.NONE.code())
.setTransactionStates(states);
}
return response;
});
}

@Data
@AllArgsConstructor
private static class TransactionalIdCoordinatorEpochAndMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.streamnative.pulsar.handlers.kop.ApiVersion;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -278,14 +279,18 @@ public static LeaveGroupResponse newLeaveGroup(Errors errors) {
return new LeaveGroupResponse(data);
}

public static ListGroupsResponse newListGroups(Errors errors,
List<GroupMetadata.GroupOverview> groups) {
public static ListGroupsResponse newListGroups(Either<Errors, List<GroupMetadata.GroupOverview>> results) {
ListGroupsResponseData data = new ListGroupsResponseData();
data.setErrorCode(errors.code());
data.setGroups(groups.stream().map(overView -> new ListGroupsResponseData.ListedGroup()
.setGroupId(overView.groupId())
.setProtocolType(overView.protocolType()))
.collect(Collectors.toList()));
data.setErrorCode(results.isLeft() ? results.getLeft().code() : Errors.NONE.code());
if (!results.isLeft()) {
data.setGroups(results.getRight().stream().map(overView -> new ListGroupsResponseData.ListedGroup()
.setGroupId(overView.groupId())
.setProtocolType(overView.protocolType()))
.collect(Collectors.toList()));

} else {
data.setGroups(Collections.emptyList());
}
return new ListGroupsResponse(data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -28,6 +29,7 @@
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupMetadata.GroupSummary;
import io.streamnative.pulsar.handlers.kop.coordinator.group.MemberMetadata.MemberSummary;
import io.streamnative.pulsar.handlers.kop.offset.OffsetAndMetadata;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.timer.MockTimer;
import java.util.ArrayList;
Expand Down Expand Up @@ -218,8 +220,8 @@ public void testRequestHandlingWhileLoadingInProgress() throws Exception {
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, describeGroupResult.getKey());

// ListGroups
KeyValue<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getKey());
Either<Errors, List<GroupOverview>> listGroupsResult = groupCoordinator.handleListGroups();
assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsResult.getLeft());

// DeleteGroups
Map<String, Errors> deleteGroupsErrors = groupCoordinator.handleDeleteGroups(
Expand Down Expand Up @@ -1695,12 +1697,12 @@ groupId, memberId, protocolType, newProtocols()
).get();
assertEquals(Errors.NONE, syncGroupResult.getKey());

KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertEquals(Errors.NONE, groups.getKey());
assertEquals(1, groups.getValue().size());
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertFalse(groups.isLeft());
assertEquals(1, groups.getRight().size());
assertEquals(
new GroupOverview("groupId", "consumer"),
groups.getValue().get(0)
groups.getRight().get(0)
);
}

Expand All @@ -1712,12 +1714,12 @@ groupId, memberId, protocolType, newProtocols()
);
assertEquals(Errors.NONE, joinGroupResult.getError());

KeyValue<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertEquals(Errors.NONE, groups.getKey());
assertEquals(1, groups.getValue().size());
Either<Errors, List<GroupOverview>> groups = groupCoordinator.handleListGroups();
assertFalse(groups.isLeft());
assertEquals(1, groups.getRight().size());
assertEquals(
new GroupOverview("groupId", "consumer"),
groups.getValue().get(0)
groups.getRight().get(0)
);
}

Expand Down
Loading

0 comments on commit 3a4e749

Please sign in to comment.