Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[cleanup] Create only one instance of KafkaTopicLookupService
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Feb 7, 2023
1 parent 57f910c commit b9ca312
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final KopBrokerLookupManager kopBrokerLookupManager;
@Getter
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
private final KafkaTopicLookupService kafkaTopicLookupService;
private final LookupClient lookupClient;

private final AdminManager adminManager;
Expand Down Expand Up @@ -82,6 +83,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
KafkaTopicLookupService kafkaTopicLookupService,
LookupClient lookupClient) {
super();
this.pulsarService = pulsarService;
Expand All @@ -104,6 +106,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
}
this.sendResponseScheduler = sendResponseScheduler;
this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState;
this.kafkaTopicLookupService = kafkaTopicLookupService;
this.lengthFieldPrepender = new LengthFieldPrepender(4);
}

Expand All @@ -130,7 +133,7 @@ public KafkaRequestHandler newCnx() throws Exception {
tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager,
producePurgatory, fetchPurgatory,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler,
kafkaTopicManagerSharedState, lookupClient);
kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient);
}

@VisibleForTesting
Expand All @@ -141,6 +144,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
enableTls, advertisedEndPoint, skipMessagesWithoutIndex,
requestStats,
sendResponseScheduler,
kafkaTopicManagerSharedState, lookupClient);
kafkaTopicManagerSharedState, kafkaTopicLookupService, lookupClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
private SystemTopicClient txnTopicClient;
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
private KafkaTopicLookupService kafkaTopicLookupService;
private LookupClient lookupClient;
@VisibleForTesting
@Getter
Expand Down Expand Up @@ -400,6 +401,7 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
requestStats,
sendResponseScheduler,
kafkaTopicManagerSharedState,
kafkaTopicLookupService,
lookupClient);
}

Expand All @@ -418,6 +420,8 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
.timeoutTimer(SystemTimer.builder().executorName("fetch").build())
.build();

kafkaTopicLookupService = new KafkaTopicLookupService(brokerService);

replicaManager = new ReplicaManager(
kafkaConfig,
requestStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
KafkaTopicLookupService kafkaTopicLookupService,
LookupClient lookupClient) throws Exception {
super(requestStats, kafkaConfig, sendResponseScheduler);
this.pulsarService = pulsarService;
Expand All @@ -343,7 +344,7 @@ public KafkaRequestHandler(PulsarService pulsarService,
this.tlsEnabled = tlsEnabled;
this.advertisedEndPoint = advertisedEndPoint;
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
this.topicManager = new KafkaTopicManager(this);
this.topicManager = new KafkaTopicManager(this, kafkaTopicLookupService);
this.defaultNumPartitions = kafkaConfig.getDefaultNumPartitions();
this.maxReadEntriesNum = kafkaConfig.getMaxReadEntriesNum();
this.currentConnectedGroup = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -41,14 +42,14 @@ public class KafkaTopicManager {

private final AtomicBoolean closed = new AtomicBoolean(false);

KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler) {
KafkaTopicManager(KafkaRequestHandler kafkaRequestHandler, KafkaTopicLookupService kafkaTopicLookupService) {
this.requestHandler = kafkaRequestHandler;
PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);
this.lookupClient = kafkaRequestHandler.getLookupClient();
this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService());
}
this.kafkaTopicLookupService = kafkaTopicLookupService;
}

// update Ctx information, since at internalServerCnx create time there is no ctx passed into kafkaRequestHandler.
public void setRemoteAddress(SocketAddress remoteAddress) {
Expand Down Expand Up @@ -101,12 +102,12 @@ public CompletableFuture<KafkaTopicConsumerManager> getTopicConsumerManager(Stri

private Producer registerInPersistentTopic(PersistentTopic persistentTopic) {
Producer producer = new InternalProducer(persistentTopic, internalServerCnx,
lookupClient.getPulsarClient().newRequestId(),
brokerService.generateUniqueProducerName());
lookupClient.getPulsarClient().newRequestId(),
brokerService.generateUniqueProducerName());

if (log.isDebugEnabled()) {
log.debug("[{}] Register Mock Producer {} into PersistentTopic {}",
requestHandler.ctx.channel(), producer, persistentTopic.getName());
requestHandler.ctx.channel(), producer, persistentTopic.getName());
}

// this will register and add USAGE_COUNT_UPDATER.
Expand All @@ -122,8 +123,9 @@ public Optional<Producer> registerProducerInPersistentTopic(String topicName, Pe
}
return Optional.empty();
}
return Optional.of(requestHandler.getKafkaTopicManagerSharedState()
.getReferences().computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic)));
ConcurrentHashMap<String, Producer> references = requestHandler
.getKafkaTopicManagerSharedState().getReferences();
return Optional.of(references.computeIfAbsent(topicName, (__) -> registerInPersistentTopic(persistentTopic)));
}

// when channel close, release all the topics reference in persistentTopic
Expand All @@ -141,18 +143,23 @@ public void close() {
}

public CompletableFuture<Optional<PersistentTopic>> getTopic(String topicName) {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopic({}) since channel is closing",
requestHandler.ctx.channel(), topicName);
try {
if (closed.get()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Return null for getTopic({}) since channel is closing",
requestHandler.ctx.channel(), topicName);
}
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture =
kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel());
// cache for removing producer
requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture);
return topicCompletableFuture;
} catch (Throwable error) {
log.error("Unhandled error for {}", topicName, error);
return CompletableFuture.completedFuture(Optional.empty());
}
CompletableFuture<Optional<PersistentTopic>> topicCompletableFuture =
kafkaTopicLookupService.getTopic(topicName, requestHandler.ctx.channel());
// cache for removing producer
requestHandler.getKafkaTopicManagerSharedState().getTopics().put(topicName, topicCompletableFuture);
return topicCompletableFuture;
}

public void invalidateCacheForFencedManagerLedgerOnTopic(String fullTopicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ protected void setup() throws Exception {
doReturn(mockChannel).when(mockCtx).channel();
kafkaRequestHandler.ctx = mockCtx;

kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler);
kafkaTopicManager = new KafkaTopicManager(kafkaRequestHandler,
new KafkaTopicLookupService(pulsar.getBrokerService()));
kafkaTopicManager.setRemoteAddress(InternalServerCnx.MOCKED_REMOTE_ADDRESS);
}

Expand Down

0 comments on commit b9ca312

Please sign in to comment.