Skip to content

Commit

Permalink
move methods from Version to Utils
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Nov 20, 2024
1 parent f048d32 commit e85ea7c
Show file tree
Hide file tree
Showing 53 changed files with 141 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ void updateLagMonitor(
if (version == null) {
// During version deletion, the version will be deleted from ZK prior to servers perform resource deletion.
// It's valid to have null version when trying to remove lag monitor for the deleted resource.
version = new VersionImpl(storeName, storeVersion, "", Version.getRealTimeTopicName(store));
version = new VersionImpl(storeName, storeVersion, "", Utils.getRealTimeTopicName(store));
}
lagMonFunction.accept(version, getPartition());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ public StoreIngestionTask(
this.versionTopic = pubSubTopicRepository.getTopic(kafkaVersionTopic);
this.storeName = versionTopic.getStoreName();
this.isUserSystemStore = VeniceSystemStoreUtils.isUserSystemStore(storeName);
this.realTimeTopic = pubSubTopicRepository.getTopic(Version.getRealTimeTopicName(store));
this.realTimeTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
this.versionNumber = Version.parseVersionFromKafkaTopicName(kafkaVersionTopic);
this.consumerActionsQueue = new PriorityBlockingQueue<>(CONSUMER_ACTION_QUEUE_INIT_CAPACITY);
this.partitionToPendingConsumerActionCountMap = new VeniceConcurrentHashMap<>();
Expand Down Expand Up @@ -3780,7 +3780,7 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
// cluster these metastore writes could be spiky
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName));
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
Expand Down Expand Up @@ -279,8 +280,8 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
OfflinePushStrategy.WAIT_ALL_REPLICAS,
1);
mockStore.addVersion(new VersionImpl(storeName, 1, "test-job-id", mockStore.getRealTimeTopicName()));
toBeDeletedStore.addVersion(
new VersionImpl(deletedStoreName, 1, "test-job-id", Version.getRealTimeTopicName(toBeDeletedStore)));
toBeDeletedStore
.addVersion(new VersionImpl(deletedStoreName, 1, "test-job-id", Utils.getRealTimeTopicName(toBeDeletedStore)));
doReturn(mockStore).when(mockMetadataRepo).getStore(storeName);
doReturn(toBeDeletedStore).when(mockMetadataRepo).getStore(deletedStoreName);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeName);
Expand All @@ -300,7 +301,7 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
kafkaStoreIngestionService.getIngestingTopicsWithVersionStatusNotOnline().size(),
0,
"Expecting an empty set since all ingesting topics have version status of ONLINE");
mockStore.addVersion(new VersionImpl(storeName, 2, "test-job-id", Version.getRealTimeTopicName(mockStore)));
mockStore.addVersion(new VersionImpl(storeName, 2, "test-job-id", Utils.getRealTimeTopicName(mockStore)));
doReturn(new Pair<>(mockStore, mockStore.getVersion(2))).when(mockMetadataRepo)
.waitVersion(eq(storeName), eq(2), any());
kafkaStoreIngestionService.startConsumption(new VeniceStoreVersionConfig(topic2, veniceProperties), 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testReportIfCatchUpBaseTopicOffsetRouteWillNotMakePushTimeout() {
* {@link StoreIngestionTask#reportIfCatchUpVersionTopicOffset(PartitionConsumptionState)}
*/
doReturn(true).when(mockOffsetRecord).isEndOfPushReceived();
doReturn(Version.getRealTimeTopicName(mockStore)).when(mockOffsetRecord).getLeaderTopic();
doReturn(Utils.getRealTimeTopicName(mockStore)).when(mockOffsetRecord).getLeaderTopic();
/**
* Return 0 as the max offset for VT and 1 as the overall consume progress, so reportIfCatchUpVersionTopicOffset()
* will determine that base topic is caught up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,7 @@ public void testCompleteCalledWhenUnsubscribeAfterBatchPushDisabled(AAConfig aaC
storeNameWithoutVersionInfo,
1,
Version.numberBasedDummyPushId(1),
Version.getRealTimeTopicName(mockStore)),
Utils.getRealTimeTopicName(mockStore)),
mockStore.getRealTimeTopicName()).when(mockStore).getVersion(1);
doReturn(mockStore).when(mockMetadataRepo).getStoreOrThrow(storeNameWithoutVersionInfo);
doReturn(getOffsetRecord(offset, true)).when(mockStorageMetadataService).getLastOffset(topic, PARTITION_FOO);
Expand Down Expand Up @@ -2462,7 +2462,7 @@ public void testDelayedTransitionToOnlineInHybridMode(AAConfig aaConfig) throws

localVeniceWriter.broadcastTopicSwitch(
Collections.singletonList(inMemoryLocalKafkaBroker.getKafkaBootstrapServer()),
Version.getRealTimeTopicName(storeInfo),
Utils.getRealTimeTopicName(storeInfo),
System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(10),
Collections.emptyMap());
for (int partition: ALL_PARTITIONS) {
Expand Down Expand Up @@ -2825,7 +2825,7 @@ public void testRecordsCanBeThrottledPerRegion() throws ExecutionException, Inte
kafkaUrlToRecordsThrottler.put(inMemoryLocalKafkaBroker.getKafkaBootstrapServer(), localThrottler);
kafkaUrlToRecordsThrottler.put(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer(), remoteThrottler);

String rtTopic = Version.getRealTimeTopicName(storeInfo);
String rtTopic = Utils.getRealTimeTopicName(storeInfo);
PubSubTopic rtPubSubTopic = pubSubTopicRepository.getTopic(rtTopic);
PubSubTopicPartition fooRtPartition = new PubSubTopicPartitionImpl(rtPubSubTopic, PARTITION_FOO);

Expand Down Expand Up @@ -2952,7 +2952,7 @@ public void testIsReadyToServe(NodeType nodeType, AAConfig aaConfig, DataReplica
false,
Optional.empty(),
null);
String rtTopicName = Version.getRealTimeTopicName(mockStore);
String rtTopicName = Utils.getRealTimeTopicName(mockStore);
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rtTopicName);
TopicSwitch topicSwitchWithSourceRealTimeTopic = new TopicSwitch();
topicSwitchWithSourceRealTimeTopic.sourceKafkaServers = new ArrayList<>();
Expand Down Expand Up @@ -3177,7 +3177,7 @@ public void testActiveActiveStoreIsReadyToServe(HybridConfig hybridConfig, NodeT
assertNotNull(activeActiveStoreIngestionTask.getIngestionBatchProcessor().getLockManager());
}

String rtTopicName = Version.getRealTimeTopicName(mockStore);
String rtTopicName = Utils.getRealTimeTopicName(mockStore);
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rtTopicName);
TopicSwitch topicSwitchWithMultipleSourceKafkaServers = new TopicSwitch();
topicSwitchWithMultipleSourceKafkaServers.sourceKafkaServers = new ArrayList<>();
Expand Down Expand Up @@ -3567,7 +3567,7 @@ public void testProcessTopicSwitch(NodeType nodeType) {
TopicSwitch topicSwitchWithRemoteRealTimeTopic = new TopicSwitch();
topicSwitchWithRemoteRealTimeTopic.sourceKafkaServers = new ArrayList<>();
topicSwitchWithRemoteRealTimeTopic.sourceKafkaServers.add(inMemoryRemoteKafkaBroker.getKafkaBootstrapServer());
topicSwitchWithRemoteRealTimeTopic.sourceTopicName = Version.getRealTimeTopicName(mockStore);
topicSwitchWithRemoteRealTimeTopic.sourceTopicName = Utils.getRealTimeTopicName(mockStore);
topicSwitchWithRemoteRealTimeTopic.rewindStartTimestamp = System.currentTimeMillis();
ControlMessage controlMessage = new ControlMessage();
controlMessage.controlMessageUnion = topicSwitchWithRemoteRealTimeTopic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.Utils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -50,7 +51,7 @@ public class TestAdminToolConsumption {
@Test
void testAdminToolAdminMessageConsumption() {
int assignedPartition = 0;
String topic = Version.composeRealTimeTopic(STORE_NAME);
String topic = Utils.composeRealTimeTopic(STORE_NAME);
PubSubTopicPartition pubSubTopicPartition =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), assignedPartition);
int adminMessageNum = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ private void configureMockTransportClient(
versionCreationResponse.setPartitionerClass(partitionerConfig.getPartitionerClass());
versionCreationResponse.setPartitionerParams(partitionerConfig.getPartitionerParams());
versionCreationResponse.setKafkaBootstrapServers("localhost:9092");
versionCreationResponse.setKafkaTopic(Version.getRealTimeTopicName(store));
versionCreationResponse.setKafkaTopic(Utils.getRealTimeTopicName(store));
versionCreationResponse.setEnableSSL(false);

return getTransportClientFuture(MAPPER.writeValueAsBytes(versionCreationResponse), delayInResponseMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down Expand Up @@ -56,7 +56,7 @@ public PushJobHeartbeatSender createHeartbeatSender(
StoreInfo storeInfo = heartBeatStoreResponse.getStore();
PartitionerConfig partitionerConfig = storeInfo.getPartitionerConfig();
int partitionNum = storeInfo.getPartitionCount();
String heartbeatKafkaTopicName = Version.getRealTimeTopicName(storeInfo);
String heartbeatKafkaTopicName = Utils.getRealTimeTopicName(storeInfo);
VeniceWriter<byte[], byte[], byte[]> veniceWriter = getVeniceWriter(
heartbeatKafkaTopicName,
partitionerConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.PartitionerConfigImpl;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatKey;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -34,7 +34,7 @@ public void testHeartbeatSenderCreation() {
StoreResponse storeResponse = mock(StoreResponse.class);
StoreInfo storeInfo = mock(StoreInfo.class);
PartitionerConfig partitionerConfig = new PartitionerConfigImpl();
doReturn(Version.composeRealTimeTopic(heartbeatStoreName)).when(storeInfo).getRealTimeTopicName();
doReturn(Utils.composeRealTimeTopic(heartbeatStoreName)).when(storeInfo).getRealTimeTopicName();
doReturn(1).when(storeInfo).getPartitionCount();
doReturn(partitionerConfig).when(storeInfo).getPartitionerConfig();
doReturn(storeInfo).when(storeResponse).getStore();
Expand All @@ -61,6 +61,6 @@ public void testHeartbeatSenderCreation() {
(DefaultPushJobHeartbeatSender) pushJobHeartbeatSender;
Assert.assertEquals(
defaultPushJobHeartbeatSender.getVeniceWriter().getTopicName(),
Version.composeRealTimeTopic(heartbeatStoreName));
Utils.composeRealTimeTopic(heartbeatStoreName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private AbstractAvroStoreClient getMockStoreClient(boolean updateEnabled, int de
versionCreationResponse.setPartitionerClass(partitionerConfig.getPartitionerClass());
versionCreationResponse.setPartitionerParams(partitionerConfig.getPartitionerParams());
versionCreationResponse.setKafkaBootstrapServers("localhost:9092");
versionCreationResponse.setKafkaTopic(Version.getRealTimeTopicName(store));
versionCreationResponse.setKafkaTopic(Utils.getRealTimeTopicName(store));
versionCreationResponse.setEnableSSL(false);

CompletableFuture<byte[]> requestTopicFuture = mock(CompletableFuture.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.venice.exceptions.StoreVersionNotFoundException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.systemstore.schemas.StoreVersion;
import com.linkedin.venice.utils.Utils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -256,7 +257,7 @@ public VersionStatus getVersionStatus(int versionNumber) {
private Version increaseVersion(String pushJobId, boolean createNewVersion) {
int versionNumber = getLargestUsedVersionNumber() + 1;
checkDisableStoreWrite("increase", versionNumber);
Version version = new VersionImpl(getName(), versionNumber, pushJobId, Version.getRealTimeTopicName(this));
Version version = new VersionImpl(getName(), versionNumber, pushJobId, Utils.getRealTimeTopicName(this));
if (createNewVersion) {
addVersion(version);
return version.cloneVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.logging.log4j.util.Strings;


/**
Expand Down Expand Up @@ -294,30 +293,6 @@ static String composeKafkaTopic(String storeName, int versionNumber) {
return storeName + VERSION_SEPARATOR + versionNumber;
}

/** This method should only be used for system stores.
* For other stores, use {@link Version#getRealTimeTopicName(Store)}, {@link Version#getRealTimeTopicName(StoreInfo)} or
* {@link Version#getRealTimeTopicName(Version)}
*/
static String composeRealTimeTopic(String storeName) {
return storeName + REAL_TIME_TOPIC_SUFFIX;
}

static String getRealTimeTopicName(Store store) {
return getRealTimeTopicNameIfEmpty(store.getRealTimeTopicName(), store.getName());
}

static String getRealTimeTopicName(StoreInfo storeInfo) {
return getRealTimeTopicNameIfEmpty(storeInfo.getRealTimeTopicName(), storeInfo.getName());
}

static String getRealTimeTopicName(Version version) {
return getRealTimeTopicNameIfEmpty(version.getRealTimeTopicName(), version.getStoreName());
}

static String getRealTimeTopicNameIfEmpty(String realTimeTopicName, String storeName) {
return Strings.isBlank(realTimeTopicName) ? composeRealTimeTopic(storeName) : realTimeTopicName;
}

static String composeSeparateRealTimeTopic(String storeName) {
return storeName + SEPARATE_REAL_TIME_TOPIC_SUFFIX;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.linkedin.venice.pushstatushelper;

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down Expand Up @@ -35,7 +35,7 @@ public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory, Schem

public VeniceWriter prepareVeniceWriter(String storeName) {
return veniceWriters.computeIfAbsent(storeName, s -> {
String rtTopic = Version.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName));
String rtTopic = Utils.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName));
VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic)
.setKeySerializer(
new VeniceAvroKafkaSerializer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
Expand All @@ -24,6 +23,7 @@
import com.linkedin.venice.systemstore.schemas.StoreReplicaStatus;
import com.linkedin.venice.systemstore.schemas.StoreValueSchema;
import com.linkedin.venice.systemstore.schemas.StoreValueSchemas;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceResourceCloseResult;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
Expand Down Expand Up @@ -415,7 +415,7 @@ Map<String, VeniceWriter> getMetaStoreWriterMap() {

VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) {
return metaStoreWriterMap.computeIfAbsent(metaStoreName, k -> {
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName));
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online");
}
Expand Down Expand Up @@ -460,7 +460,7 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter,
* to write a Control Message to the RT topic, and it could hang if the topic doesn't exist.
* This check is a best-effort since the race condition is still there between topic check and closing VeniceWriter.
*/
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(metaStoreName));
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
LOGGER.info(
"RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages",
Expand Down
Loading

0 comments on commit e85ea7c

Please sign in to comment.