Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[cleanup] Remove static LOOKUP_CLIENT_MAP
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Feb 7, 2023
1 parent 5398f77 commit 57f910c
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final KopBrokerLookupManager kopBrokerLookupManager;
@Getter
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;
private final LookupClient lookupClient;

private final AdminManager adminManager;
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
Expand Down Expand Up @@ -80,13 +81,15 @@ public KafkaChannelInitializer(PulsarService pulsarService,
boolean skipMessagesWithoutIndex,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) {
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
LookupClient lookupClient) {
super();
this.pulsarService = pulsarService;
this.kafkaConfig = kafkaConfig;
this.tenantContextManager = tenantContextManager;
this.replicaManager = replicaManager;
this.kopBrokerLookupManager = kopBrokerLookupManager;
this.lookupClient = lookupClient;
this.adminManager = adminManager;
this.producePurgatory = producePurgatory;
this.fetchPurgatory = fetchPurgatory;
Expand Down Expand Up @@ -127,7 +130,7 @@ public KafkaRequestHandler newCnx() throws Exception {
tenantContextManager, replicaManager, kopBrokerLookupManager, adminManager,
producePurgatory, fetchPurgatory,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, requestStats, sendResponseScheduler,
kafkaTopicManagerSharedState);
kafkaTopicManagerSharedState, lookupClient);
}

@VisibleForTesting
Expand All @@ -138,6 +141,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
enableTls, advertisedEndPoint, skipMessagesWithoutIndex,
requestStats,
sendResponseScheduler,
kafkaTopicManagerSharedState);
kafkaTopicManagerSharedState, lookupClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.commons.configuration.Configuration;
Expand All @@ -52,7 +51,6 @@
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
Expand All @@ -71,8 +69,6 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag

public static final String PROTOCOL_NAME = "kafka";
public static final String TLS_HANDLER = "tls";
private static final Map<PulsarService, LookupClient> LOOKUP_CLIENT_MAP = new ConcurrentHashMap<>();

@Getter
private RequestStats requestStats;
private PrometheusMetricsProvider statsProvider;
Expand All @@ -84,6 +80,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
private SystemTopicClient txnTopicClient;
private DelayedOperationPurgatory<DelayedOperation> producePurgatory;
private DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
private LookupClient lookupClient;
@VisibleForTesting
@Getter
private Map<InetSocketAddress, ChannelInitializer<SocketChannel>> channelInitializerMap;
Expand Down Expand Up @@ -202,12 +199,12 @@ public void start(BrokerService service) {
throw new IllegalStateException(e);
}

LOOKUP_CLIENT_MAP.put(brokerService.pulsar(), new LookupClient(brokerService.pulsar(), kafkaConfig));
lookupClient = new LookupClient(brokerService.pulsar(), kafkaConfig);
offsetTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);
txnTopicClient = new SystemTopicClient(brokerService.pulsar(), kafkaConfig);

try {
kopBrokerLookupManager = new KopBrokerLookupManager(kafkaConfig, brokerService.getPulsar());
kopBrokerLookupManager = new KopBrokerLookupManager(kafkaConfig, brokerService.getPulsar(), lookupClient);
} catch (Exception ex) {
log.error("Failed to get kopBrokerLookupManager", ex);
throw new IllegalStateException(ex);
Expand Down Expand Up @@ -402,7 +399,8 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
kafkaConfig.isSkipMessagesWithoutIndex(),
requestStats,
sendResponseScheduler,
kafkaTopicManagerSharedState);
kafkaTopicManagerSharedState,
lookupClient);
}

// this is called after initialize, and with kafkaConfig, brokerService all set.
Expand Down Expand Up @@ -456,16 +454,6 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti

@Override
public void close() {
Optional.ofNullable(LOOKUP_CLIENT_MAP.remove(brokerService.pulsar())).ifPresent(LookupClient::close);
if (offsetTopicClient != null) {
offsetTopicClient.close();
}
if (txnTopicClient != null) {
txnTopicClient.close();
}
if (adminManager != null) {
adminManager.shutdown();
}
if (producePurgatory != null) {
producePurgatory.shutdown();
}
Expand All @@ -483,6 +471,19 @@ public void close() {
kopBrokerLookupManager.close();
statsProvider.stop();
sendResponseScheduler.shutdown();

if (offsetTopicClient != null) {
offsetTopicClient.close();
}
if (txnTopicClient != null) {
txnTopicClient.close();
}
if (adminManager != null) {
adminManager.shutdown();
}
if (lookupClient != null) {
lookupClient.close();
}
}

@VisibleForTesting
Expand Down Expand Up @@ -571,8 +572,4 @@ public TransactionCoordinator initTransactionCoordinator(String tenant, PulsarAd

return transactionCoordinator;
}

public static @NonNull LookupClient getLookupClient(final PulsarService pulsarService) {
return LOOKUP_CLIENT_MAP.computeIfAbsent(pulsarService, ignored -> new LookupClient(pulsarService));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
private final TenantContextManager tenantContextManager;
private final ReplicaManager replicaManager;
private final KopBrokerLookupManager kopBrokerLookupManager;

@Getter
private final LookupClient lookupClient;
@Getter
private final KafkaTopicManagerSharedState kafkaTopicManagerSharedState;

Expand Down Expand Up @@ -313,12 +316,14 @@ public KafkaRequestHandler(PulsarService pulsarService,
boolean skipMessagesWithoutIndex,
RequestStats requestStats,
OrderedScheduler sendResponseScheduler,
KafkaTopicManagerSharedState kafkaTopicManagerSharedState) throws Exception {
KafkaTopicManagerSharedState kafkaTopicManagerSharedState,
LookupClient lookupClient) throws Exception {
super(requestStats, kafkaConfig, sendResponseScheduler);
this.pulsarService = pulsarService;
this.tenantContextManager = tenantContextManager;
this.replicaManager = replicaManager;
this.kopBrokerLookupManager = kopBrokerLookupManager;
this.lookupClient = lookupClient;
this.clusterName = kafkaConfig.getClusterName();
this.executor = pulsarService.getExecutor();
this.admin = pulsarService.getAdminClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class KafkaTopicManager {
PulsarService pulsarService = kafkaRequestHandler.getPulsarService();
this.brokerService = pulsarService.getBrokerService();
this.internalServerCnx = new InternalServerCnx(requestHandler);
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
this.lookupClient = kafkaRequestHandler.getLookupClient();
this.kafkaTopicLookupService = new KafkaTopicLookupService(pulsarService.getBrokerService());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ public class KopBrokerLookupManager {
public static final ConcurrentHashMap<String, CompletableFuture<InetSocketAddress>>
LOOKUP_CACHE = new ConcurrentHashMap<>();

public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService) throws Exception {
public KopBrokerLookupManager(KafkaServiceConfiguration conf, PulsarService pulsarService,
LookupClient lookupClient) throws Exception {
this.pulsar = pulsarService;
this.lookupClient = KafkaProtocolHandler.getLookupClient(pulsarService);
this.lookupClient = lookupClient;
this.metadataStoreCacheLoader = new MetadataStoreCacheLoader(pulsarService.getPulsarResources(),
conf.getBrokerLookupTimeoutMs());
this.selfAdvertisedListeners = conf.getKafkaAdvertisedListeners();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ public LookupClient(final PulsarService pulsarService, final KafkaServiceConfigu
super(createPulsarClient(pulsarService, kafkaConfig, conf -> {}));
}

public LookupClient(final PulsarService pulsarService) {
super(createPulsarClient(pulsarService));
log.warn("This constructor should not be called, it's only called "
+ "when the PulsarService doesn't exist in KafkaProtocolHandlers.LOOKUP_CLIENT_UP");
}

public CompletableFuture<InetSocketAddress> getBrokerAddress(final TopicName topicName) {
return getPulsarClient().getLookup().getBroker(topicName).thenApply(Pair::getLeft);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class KopBrokerLookupManagerTest extends KopProtocolHandlerTestBase {
private static final String TENANT = "test";
private static final String NAMESPACE = TENANT + "/" + "kop-broker-lookup-manager-test";

private LookupClient lookupClient;
private KopBrokerLookupManager kopBrokerLookupManager;

@BeforeClass
Expand All @@ -43,13 +44,16 @@ protected void setup() throws Exception {
.build());
admin.namespaces().createNamespace(NAMESPACE);
admin.namespaces().setDeduplicationStatus(NAMESPACE, true);
kopBrokerLookupManager = new KopBrokerLookupManager(conf, pulsar);
lookupClient = new LookupClient(pulsar, conf);
kopBrokerLookupManager = new KopBrokerLookupManager(conf, pulsar, lookupClient);
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
kopBrokerLookupManager.close();
lookupClient.close();
}

@Test(timeOut = 20 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ protected void createAdmin() throws Exception {
}

protected void createClient() throws Exception {
this.pulsarClient = KafkaProtocolHandler.getLookupClient(pulsar).getPulsarClient();
this.pulsarClient = new LookupClient(pulsar, conf).getPulsarClient();
}

protected String getAdvertisedAddress() {
Expand Down

0 comments on commit 57f910c

Please sign in to comment.