From b9ca3123171fa952481789932188123dad27d82a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 7 Feb 2023 12:21:50 +0100 Subject: [PATCH] [cleanup] Create only one instance of KafkaTopicLookupService --- .../handlers/kop/KafkaChannelInitializer.java | 7 +++- .../handlers/kop/KafkaProtocolHandler.java | 4 ++ .../handlers/kop/KafkaRequestHandler.java | 3 +- .../handlers/kop/KafkaTopicManager.java | 41 +++++++++++-------- .../kop/KafkaTopicConsumerManagerTest.java | 3 +- 5 files changed, 37 insertions(+), 21 deletions(-) diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java index 980dfcbf2e..dce9ccce96 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaChannelInitializer.java @@ -50,6 +50,7 @@ public class KafkaChannelInitializer extends ChannelInitializer { private final KopBrokerLookupManager kopBrokerLookupManager; @Getter private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState; + private final KafkaTopicLookupService kafkaTopicLookupService; private final LookupClient lookupClient; private final AdminManager adminManager; @@ -82,6 +83,7 @@ public KafkaChannelInitializer(PulsarService pulsarService, RequestStats requestStats, OrderedScheduler sendResponseScheduler, KafkaTopicManagerSharedState kafkaTopicManagerSharedState, + KafkaTopicLookupService kafkaTopicLookupService, LookupClient lookupClient) { super(); this.pulsarService = pulsarService; @@ -104,6 +106,7 @@ public KafkaChannelInitializer(PulsarService pulsarService, } this.sendResponseScheduler = sendResponseScheduler; this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState; + this.kafkaTopicLookupService = kafkaTopicLookupService; this.lengthFieldPrepender = new LengthFieldPrepender(4); } @@ -130,7 +133,7 @@ public KafkaRequestHandler newCnx() throws Exception { tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager, producePurgatory, fetchPurgatory, enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler, - kafkaTopicManagerSharedState, lookupClient); + kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient); } @VisibleForTesting @@ -141,6 +144,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler, - kafkaTopicManagerSharedState, lookupClient); + kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient); } } diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java index 90ab7de0fd..3fb99536c2 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java @@ -80,6 +80,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag private SystemTopicClient txnTopicClient; private DelayedOperationPurgatory producePurgatory; private DelayedOperationPurgatory fetchPurgatory; + private KafkaTopicLookupService kafkaTopicLookupService; private LookupClient lookupClient; @VisibleForTesting @Getter @@ -400,6 +401,7 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi requestStats, sendResponseScheduler, kafkaTopicManagerSharedState, + kafkaTopicLookupService, lookupClient); } @@ -418,6 +420,8 @@ public Map> newChannelIniti .timeoutTimer(SystemTimer.builder().executorName("fetch").build()) .build(); + kafkaTopicLookupService = new KafkaTopicLookupService(brokerService); + replicaManager = new ReplicaManager( kafkaConfig, requestStats, diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java index 1642301101..151f2081ec 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java @@ -317,6 +317,7 @@ public KafkaRequestHandler(PulsarService pulsarService, RequestStats requestStats, OrderedScheduler sendResponseScheduler, KafkaTopicManagerSharedState kafkaTopicManagerSharedState, + KafkaTopicLookupService kafkaTopicLookupService, LookupClient lookupClient) throws Exception { super(requestStats, kafkaConfig, sendResponseScheduler); this.pulsarService = pulsarService; @@ -343,7 +344,7 @@ public KafkaRequestHandler(PulsarService pulsarService, this.tlsEnabled = tlsEnabled; this.advertisedEndPoint = advertisedEndPoint; this.skipMessagesWithoutIndex = skipMessagesWithoutIndex; - this.topicManager = new KafkaTopicManager(this); + this.topicManager = new KafkaTopicManager(this, kafkaTopicLookupService); this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions(); this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum(); this.currentConnectedGroup = new ConcurrentHashMap<>(); diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java index 2f51ba8d88..f63fc51b6a 100644 --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaTopicManager.java @@ -16,6 +16,7 @@ import java.net.SocketAddress; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; @@ -41,14 +42,14 @@ public class KafkaTopicManager { private final AtomicBoolean closed = new AtomicBoolean(false); - KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) { + KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler, KafkaTopicLookupService kafkaTopicLookupService) { this.requestHandler = kafkaRequestHandler; PulsarService pulsarService = kafkaRequestHandler.getPulsarService(); this.brokerService = pulsarService.getBrokerService(); this.internalServerCnx = new InternalServerCnx(requestHandler); this.lookupClient = kafkaRequestHandler.getLookupClient(); - this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService()); - } + this.kafkaTopicLookupService = kafkaTopicLookupService; + } // update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler. public void setRemoteAddress(SocketAddress remoteAddress) { @@ -101,12 +102,12 @@ public CompletableFuture getTopicConsumerManager(Stri private Producer registerInPersistentTopic(PersistentTopic persistentTopic) { Producer producer = new InternalProducer(persistentTopic, internalServerCnx, - lookupClient.getPulsarClient().newRequestId(), - brokerService.generateUniqueProducerName()); + lookupClient.getPulsarClient().newRequestId(), + brokerService.generateUniqueProducerName()); if (log.isDebugEnabled()) { log.debug("[{}] Register Mock Producer {} into PersistentTopic {}", - requestHandler.ctx.channel(), producer, persistentTopic.getName()); + requestHandler.ctx.channel(), producer, persistentTopic.getName()); } // this will register and add USAGE_COUNT_UPDATER. @@ -122,8 +123,9 @@ public Optional registerProducerInPersistentTopic(String topicName, Pe } return Optional.empty(); } - return Optional.of(requestHandler.getKafkaTopicManagerSharedState() - .getReferences().computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic))); + ConcurrentHashMap references = requestHandler + .getKafkaTopicManagerSharedState().getReferences(); + return Optional.of(references.computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic))); } // when channel close, release all the topics reference in persistentTopic @@ -141,18 +143,23 @@ public void close() { } public CompletableFuture> getTopic(String topicName) { - if (closed.get()) { - if (log.isDebugEnabled()) { - log.debug("[{}] Return null for getTopic({}) since channel is closing", - requestHandler.ctx.channel(), topicName); + try { + if (closed.get()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Return null for getTopic({}) since channel is closing", + requestHandler.ctx.channel(), topicName); + } + return CompletableFuture.completedFuture(Optional.empty()); } + CompletableFuture> topicCompletableFuture = + kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel()); + // cache for removing producer + requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture); + return topicCompletableFuture; + } catch (Throwable error) { + log.error("Unhandled error for {}", topicName, error); return CompletableFuture.completedFuture(Optional.empty()); } - CompletableFuture> topicCompletableFuture = - kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel()); - // cache for removing producer - requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture); - return topicCompletableFuture; } public void invalidateCacheForFencedManagerLedgerOnTopic(String fullTopicName) { diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java index 4fa30ff5fb..aefc3534f8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaTopicConsumerManagerTest.java @@ -87,7 +87,8 @@ protected void setup() throws Exception { doReturn(mockChannel).when(mockCtx).channel(); kafkaRequestHandler.ctx = mockCtx; - kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler); + kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler, + new KafkaTopicLookupService(pulsar.getBrokerService())); kafkaTopicManager.setRemoteAddress(InternalServerCnx.MOCKED_REMOTE_ADDRESS); }