Skip to content

Commit

Permalink
[changelog][dvc] Disable TC internal retry in ThinClientMetaStoreBase…
Browse files Browse the repository at this point in the history
…dRepository (#1235)

* [changelog][dvc] Disable TC internal retry in ThinClientMetaStoreBasedRepository

1. The existing TC internal retry implemented in RetriableStoreClient is vulnerable to a dead
lock issue due to deserialization thread performing and waiting on the retry. Once the thread
pool is exhausted all deserialization threads will be waiting forever because there won't be
any threads left to handle the transport response. We will fix this internal retry in another commit.

2. Disable the TC internal retry in ThinClientMetaStoreBasedRepository because we recently added
external retry with RetryUtils.executeWithMaxAttempt and timeout. Remove hardcoded retry configs
from NativeMetadataRepository since it's no longer needed and it was leaking implementation details.

3. Added staleness metric in NativeMetadataRepository that should help us detect any issue causing
the local metadata to be stale (including the dead lock issue mentioned above) for both DVC and
change log use cases.

* Addressed review comments
  • Loading branch information
xunyin8 authored Oct 15, 2024
1 parent 776149e commit 9a3f892
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;
import static java.lang.Thread.currentThread;

import com.linkedin.davinci.stats.NativeMetadataRepositoryStats;
import com.linkedin.venice.client.exceptions.ServiceDiscoveryException;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.common.VeniceSystemStoreType;
Expand All @@ -30,6 +31,7 @@
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -57,9 +59,6 @@
*/
public abstract class NativeMetadataRepository
implements SubscriptionBasedReadOnlyStoreRepository, ReadOnlySchemaRepository, ClusterInfoProvider {
protected static final int THIN_CLIENT_RETRY_COUNT = 3;
protected static final long THIN_CLIENT_RETRY_BACKOFF_MS = 10000;

private static final long DEFAULT_REFRESH_INTERVAL_IN_SECONDS = 60;
private static final Logger LOGGER = LogManager.getLogger(NativeMetadataRepository.class);

Expand All @@ -78,13 +77,23 @@ public abstract class NativeMetadataRepository

private final long refreshIntervalInSeconds;

private final NativeMetadataRepositoryStats nativeMetadataRepositoryStats;

private final Clock clock;
private AtomicBoolean started = new AtomicBoolean(false);

protected NativeMetadataRepository(ClientConfig clientConfig, VeniceProperties backendConfig) {
this(clientConfig, backendConfig, Clock.systemUTC());
}

protected NativeMetadataRepository(ClientConfig clientConfig, VeniceProperties backendConfig, Clock clock) {
refreshIntervalInSeconds = backendConfig.getLong(
CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS,
NativeMetadataRepository.DEFAULT_REFRESH_INTERVAL_IN_SECONDS);
this.clientConfig = clientConfig;
this.nativeMetadataRepositoryStats =
new NativeMetadataRepositoryStats(clientConfig.getMetricsRepository(), "native_metadata_repository", clock);
this.clock = clock;
}

public synchronized void start() {
Expand Down Expand Up @@ -172,6 +181,7 @@ public Store refreshOneStore(String storeName) {
if (newStore != null && !storeConfig.isDeleting()) {
putStore(newStore);
getAndCacheSchemaDataFromSystemStore(storeName);
nativeMetadataRepositoryStats.updateCacheTimestamp(storeName, clock.millis());
} else {
removeStore(storeName);
}
Expand Down Expand Up @@ -467,6 +477,7 @@ protected Store putStore(Store newStore) {
protected Store removeStore(String storeName) {
// Remove the store name from the subscription.
Store oldStore = subscribedStoreMap.remove(storeName);
nativeMetadataRepositoryStats.removeCacheTimestamp(storeName);
if (oldStore != null) {
totalStoreReadQuota.addAndGet(-oldStore.getReadQuotaInCU());
notifyStoreDeleted(oldStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,9 @@ private AvroSpecificStoreClient<StoreMetaKey, StoreMetaValue> getAvroClientForMe
ClientConfig<StoreMetaValue> clonedClientConfig = ClientConfig.cloneConfig(clientConfig)
.setStoreName(VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName))
.setSpecificValueClass(StoreMetaValue.class)
.setRetryOnAllErrors(true)
.setRetryCount(THIN_CLIENT_RETRY_COUNT)
.setForceClusterDiscoveryAtStartTime(true)
.setRetryBackOffInMs(THIN_CLIENT_RETRY_BACKOFF_MS);
.setRetryOnRouterError(false)
.setRetryOnAllErrors(false)
.setForceClusterDiscoveryAtStartTime(true);
return ClientFactory.getAndStartSpecificAvroClient(clonedClientConfig);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public VeniceMetadataRepositoryBuilder(
boolean isIngestionIsolation) {
this.configLoader = configLoader;
this.clientConfig = clientConfig;
if (clientConfig != null) {
clientConfig.setMetricsRepository(metricsRepository);
}
this.metricsRepository = metricsRepository;
this.isIngestionIsolation = isIngestionIsolation;
this.icProvider = icProvider;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.davinci.stats;

import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.tehuti.metrics.MetricsRepository;
import io.tehuti.metrics.Sensor;
import io.tehuti.metrics.stats.AsyncGauge;
import java.time.Clock;
import java.util.Map;


public class NativeMetadataRepositoryStats extends AbstractVeniceStats {
private final Sensor storeMetadataStalenessSensor;
private final Map<String, Long> metadataCacheTimestampMapInMs = new VeniceConcurrentHashMap<>();
private final Clock clock;

public NativeMetadataRepositoryStats(MetricsRepository metricsRepository, String name, Clock clock) {
super(metricsRepository, name);
this.clock = clock;
this.storeMetadataStalenessSensor = registerSensor(
new AsyncGauge(
(ignored1, ignored2) -> getMetadataStalenessHighWatermarkMs(),
"store_metadata_staleness_high_watermark_ms"));
}

final double getMetadataStalenessHighWatermarkMs() {
if (this.metadataCacheTimestampMapInMs.isEmpty()) {
return Double.NaN;
} else {
long oldest = metadataCacheTimestampMapInMs.values().stream().min(Long::compareTo).get();
return clock.millis() - oldest;
}
}

public void updateCacheTimestamp(String storeName, long cacheTimeStampInMs) {
metadataCacheTimestampMapInMs.put(storeName, cacheTimeStampInMs);
}

public void removeCacheTimestamp(String storeName) {
metadataCacheTimestampMapInMs.remove(storeName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -64,6 +62,7 @@
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.views.ChangeCaptureView;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -182,6 +181,7 @@ public void setUp() {
.setLocalD2ZkHosts(TEST_ZOOKEEPER_ADDRESS)
.setRocksDBBlockCacheSizeInBytes(TEST_ROCKSDB_BLOCK_CACHE_SIZE_IN_BYTES)
.setDatabaseSyncBytesInterval(TEST_DB_SYNC_BYTES_INTERVAL);
changelogClientConfig.getInnerClientConfig().setMetricsRepository(new MetricsRepository());
bootstrappingVeniceChangelogConsumer =
new InternalLocalBootstrappingVeniceChangelogConsumer<>(changelogClientConfig, pubSubConsumer, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.views.ChangeCaptureView;
import io.tehuti.metrics.MetricsRepository;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -125,10 +126,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup
newVersionTopic,
false);
ChangelogClientConfig changelogClientConfig =
new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient)
.setSchemaReader(schemaReader)
.setStoreName(storeName)
.setViewName("changeCaptureView");
getChangelogClientConfig(d2ControllerClient).setViewName("changeCaptureView");
VeniceChangelogConsumerImpl<String, Utf8> veniceChangelogConsumer =
new VeniceChangelogConsumerImpl<>(changelogClientConfig, mockPubSubConsumer);
Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2);
Expand Down Expand Up @@ -194,11 +192,7 @@ public void testAfterImageConsumerSeek() throws ExecutionException, InterruptedE
PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1));

prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true);
ChangelogClientConfig changelogClientConfig =
new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient)
.setSchemaReader(schemaReader)
.setStoreName(storeName)
.setViewName("");
ChangelogClientConfig changelogClientConfig = getChangelogClientConfig(d2ControllerClient).setViewName("");

VeniceChangelogConsumerImpl mockInternalSeekConsumer = Mockito.mock(VeniceChangelogConsumerImpl.class);
Mockito.when(mockInternalSeekConsumer.subscribe(any())).thenReturn(CompletableFuture.completedFuture(null));
Expand Down Expand Up @@ -279,11 +273,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
pubSubTopicRepository.getTopic(oldVersionTopic + ChangeCaptureView.CHANGE_CAPTURE_TOPIC_SUFFIX);

prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true);
ChangelogClientConfig changelogClientConfig =
new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient)
.setSchemaReader(schemaReader)
.setStoreName(storeName)
.setViewName("");
ChangelogClientConfig changelogClientConfig = getChangelogClientConfig(d2ControllerClient).setViewName("");
VeniceChangelogConsumerImpl<String, Utf8> veniceChangelogConsumer =
new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer);
Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2);
Expand Down Expand Up @@ -351,11 +341,7 @@ public void testMetricReportingThread() throws InterruptedException {
PubSubTopic oldVersionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, 1));

prepareVersionTopicRecordsToBePolled(0L, 5L, mockPubSubConsumer, oldVersionTopic, 0, true);
ChangelogClientConfig changelogClientConfig =
new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient)
.setSchemaReader(schemaReader)
.setStoreName(storeName)
.setViewName("");
ChangelogClientConfig changelogClientConfig = getChangelogClientConfig(d2ControllerClient).setViewName("");
VeniceChangelogConsumerImpl<String, Utf8> veniceChangelogConsumer =
new VeniceAfterImageConsumerImpl<>(changelogClientConfig, mockPubSubConsumer);
Assert.assertEquals(veniceChangelogConsumer.getPartitionCount(), 2);
Expand Down Expand Up @@ -560,4 +546,13 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> constructStartOfPush
PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(versionTopic, partition);
return new ImmutablePubSubMessage<>(kafkaKey, kafkaMessageEnvelope, pubSubTopicPartition, 0, 0, 0);
}

private ChangelogClientConfig getChangelogClientConfig(D2ControllerClient d2ControllerClient) {
ChangelogClientConfig changelogClientConfig =
new ChangelogClientConfig<>().setD2ControllerClient(d2ControllerClient)
.setSchemaReader(schemaReader)
.setStoreName(storeName);
changelogClientConfig.getInnerClientConfig().setMetricsRepository(new MetricsRepository());
return changelogClientConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,35 @@
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.systemstore.schemas.StoreValueSchema;
import com.linkedin.venice.systemstore.schemas.StoreValueSchemas;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.time.Clock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class NativeMetadataRepositoryTest {
private ClientConfig clientConfig;
private VeniceProperties backendConfig;

private MetricsRepository metricsRepository;
private Clock clock;
private static final String STORE_NAME = "hardware_store";

@BeforeClass
@BeforeMethod
public void setUpMocks() {
clientConfig = mock(ClientConfig.class);
backendConfig = mock(VeniceProperties.class);
doReturn(1L).when(backendConfig).getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong());
metricsRepository = new MetricsRepository();
doReturn(metricsRepository).when(clientConfig).getMetricsRepository();
clock = mock(Clock.class);
doReturn(0L).when(clock).millis();
}

@Test
Expand All @@ -56,7 +66,7 @@ public void testGetInstance() {

@Test
public void testGetSchemaDataFromReadThroughCache() throws InterruptedException {
TestNMR nmr = new TestNMR(clientConfig, backendConfig);
TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock);
nmr.start();
Assert.assertThrows(VeniceNoStoreException.class, () -> nmr.getKeySchema(STORE_NAME));
nmr.subscribe(STORE_NAME);
Expand All @@ -67,7 +77,7 @@ public void testGetSchemaDataFromReadThroughCache() throws InterruptedException
public void testGetSchemaDataEfficiently() throws InterruptedException {
doReturn(Long.MAX_VALUE).when(backendConfig)
.getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong());
TestNMR nmr = new TestNMR(clientConfig, backendConfig);
TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock);
nmr.start();
Assert.assertEquals(nmr.keySchemaRequestCount, 0);
Assert.assertEquals(nmr.valueSchemasRequestCount, 0);
Expand Down Expand Up @@ -97,13 +107,55 @@ public void testGetSchemaDataEfficiently() throws InterruptedException {
Assert.assertNotNull(nmr.getValueSchema(STORE_NAME, 2));
}

/**
* We are using {@link TestUtils#waitForNonDeterministicAssertion(long, TimeUnit, TestUtils.NonDeterministicAssertion)}
* because in some rare cases the underlying async gauge will return stale value
*/
@Test
public void testNativeMetadataRepositoryStats() throws InterruptedException {
doReturn(Long.MAX_VALUE).when(backendConfig)
.getLong(eq(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS), anyLong());
TestNMR nmr = new TestNMR(clientConfig, backendConfig, clock);
nmr.start();
nmr.subscribe(STORE_NAME);
doReturn(1000L).when(clock).millis();
String stalenessMetricName = ".native_metadata_repository--store_metadata_staleness_high_watermark_ms.Gauge";
TestUtils.waitForNonDeterministicAssertion(500, TimeUnit.MILLISECONDS, () -> {
Metric staleness = metricsRepository.getMetric(stalenessMetricName);
Assert.assertNotNull(staleness);
Assert.assertEquals(staleness.value(), 1000d);
});
String anotherStoreName = STORE_NAME + "V2";
nmr.subscribe(anotherStoreName);
nmr.refreshOneStore(anotherStoreName);
// After one store refresh we should still see staleness increase because it reports the max amongst all stores
doReturn(2000L).when(clock).millis();
TestUtils.waitForNonDeterministicAssertion(
500,
TimeUnit.MILLISECONDS,
() -> Assert.assertEquals(metricsRepository.getMetric(stalenessMetricName).value(), 2000d));
// Refresh both stores and staleness should decrease
nmr.refresh();
TestUtils.waitForNonDeterministicAssertion(
500,
TimeUnit.MILLISECONDS,
() -> Assert.assertEquals(metricsRepository.getMetric(stalenessMetricName).value(), 0d));
nmr.unsubscribe(STORE_NAME);
nmr.unsubscribe(anotherStoreName);
// Unsubscribing stores should remove their corresponding staleness metric
TestUtils.waitForNonDeterministicAssertion(
500,
TimeUnit.MILLISECONDS,
() -> Assert.assertEquals(metricsRepository.getMetric(stalenessMetricName).value(), Double.NaN));
}

static class TestNMR extends NativeMetadataRepository {
int keySchemaRequestCount = 0;
int valueSchemasRequestCount = 0;
int specificValueSchemaRequestCount = 0;

protected TestNMR(ClientConfig clientConfig, VeniceProperties backendConfig) {
super(clientConfig, backendConfig);
protected TestNMR(ClientConfig clientConfig, VeniceProperties backendConfig, Clock clock) {
super(clientConfig, backendConfig, clock);
}

@Override
Expand All @@ -116,7 +168,7 @@ protected StoreConfig getStoreConfigFromSystemStore(String storeName) {
@Override
protected Store getStoreFromSystemStore(String storeName, String clusterName) {
Store store = mock(Store.class);
when(store.getName()).thenReturn(STORE_NAME);
when(store.getName()).thenReturn(storeName);
when(store.getReadQuotaInCU()).thenReturn(1L);
return store;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.linkedin.davinci.stats;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

import io.tehuti.metrics.MetricsRepository;
import java.time.Clock;
import org.testng.Assert;
import org.testng.annotations.Test;


public class NativeMetadataRepositoryStatsTest {
@Test
public void testStats() {
Clock mockClock = mock(Clock.class);
doReturn(1000L).when(mockClock).millis();
String store1 = "testStore1";
String store2 = "testStore2";
NativeMetadataRepositoryStats stats = new NativeMetadataRepositoryStats(new MetricsRepository(), "test", mockClock);
Assert.assertEquals(stats.getMetadataStalenessHighWatermarkMs(), Double.NaN);
stats.updateCacheTimestamp(store1, 0);
Assert.assertEquals(stats.getMetadataStalenessHighWatermarkMs(), 1000d);
stats.updateCacheTimestamp(store2, 1000);
Assert.assertEquals(stats.getMetadataStalenessHighWatermarkMs(), 1000d);
doReturn(1500L).when(mockClock).millis();
stats.updateCacheTimestamp(store1, 1100);
Assert.assertEquals(stats.getMetadataStalenessHighWatermarkMs(), 500d);
stats.removeCacheTimestamp(store2);
Assert.assertEquals(stats.getMetadataStalenessHighWatermarkMs(), 400d);
stats.removeCacheTimestamp(store1);
Assert.assertEquals(stats.getMetadataStalenessHighWatermarkMs(), Double.NaN);
}
}
Loading

0 comments on commit 9a3f892

Please sign in to comment.