From c20e329a326d9cd3f445efa63fb3406bd2c973d7 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 26 Apr 2024 13:04:06 -0400 Subject: [PATCH] Fix reporting metrics not supported warning for BigQueryIO Direct read (#31096) When throttled. This fixes throttling based autoscaling for BigQueryIO DIRECT_READ pipeline on Dataflow --- .../sdk/io/gcp/bigquery/BigQueryServices.java | 8 ++++ .../io/gcp/bigquery/BigQueryServicesImpl.java | 38 ++++++++++++++++--- .../bigquery/BigQueryStorageStreamSource.java | 7 +++- .../bigquery/BigQueryServicesImplTest.java | 12 ++++-- 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index a8e1adf643ab..374288de6562 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -301,6 +301,14 @@ interface StorageClient extends AutoCloseable { /* This method variant collects request count metric, using the fullTableID metadata. */ SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request, String fullTableId); + /** + * Call this method on Work Item thread to report outstanding metrics. + * + *

Because incrementing metrics is only supported on the execution thread, callback thread + * that has pending metrics cannot report it directly. + */ + default void reportPendingMetrics() {} + /** * Close the client object. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index fa680004a7cb..6a0f5398e86e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -1653,13 +1653,30 @@ public void cancel() { static class StorageClientImpl implements StorageClient { + public final Counter throttlingMsecs = + Metrics.counter(StorageClientImpl.class, "throttling-msecs"); + + private transient long unreportedDelay = 0L; + + private void addToPendingMetrics(long delay) { + unreportedDelay += delay; + } + + @Override + public void reportPendingMetrics() { + long delay = unreportedDelay; + unreportedDelay = 0L; + + if (delay > 0) { + throttlingMsecs.inc(delay); + } + } + // If client retries ReadRows requests due to RESOURCE_EXHAUSTED error, bump // throttlingMsecs according to delay. Runtime can use this information for // autoscaling decisions. @VisibleForTesting - public static class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener { - public final Counter throttlingMsecs = - Metrics.counter(StorageClientImpl.class, "throttling-msecs"); + class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener { @SuppressWarnings("ProtoDurationGetSecondsGetNano") @Override @@ -1673,7 +1690,7 @@ public void onRetryAttempt(Status status, Metadata metadata) { long delay = retryInfo.getRetryDelay().getSeconds() * 1000 + retryInfo.getRetryDelay().getNanos() / 1000000; - throttlingMsecs.inc(delay); + addToPendingMetrics(delay); } } } @@ -1685,7 +1702,11 @@ public void onRetryAttempt(Status status, Metadata metadata) { private final BigQueryReadClient client; - private StorageClientImpl(BigQueryOptions options) throws IOException { + private final RetryAttemptCounter listener; + + @VisibleForTesting + StorageClientImpl(BigQueryOptions options) throws IOException { + listener = new RetryAttemptCounter(); BigQueryReadSettings.Builder settingsBuilder = BigQueryReadSettings.newBuilder() .setCredentialsProvider(FixedCredentialsProvider.create(options.getGcpCredential())) @@ -1693,7 +1714,7 @@ private StorageClientImpl(BigQueryOptions options) throws IOException { BigQueryReadSettings.defaultGrpcTransportProviderBuilder() .setHeaderProvider(USER_AGENT_HEADER_PROVIDER) .build()) - .setReadRowsRetryAttemptListener(new RetryAttemptCounter()); + .setReadRowsRetryAttemptListener(listener); UnaryCallSettings.Builder createReadSessionSettings = settingsBuilder.getStubSettingsBuilder().createReadSessionSettings(); @@ -1723,6 +1744,11 @@ private StorageClientImpl(BigQueryOptions options) throws IOException { this.client = BigQueryReadClient.create(settingsBuilder.build()); } + @VisibleForTesting + RetryAttemptCounter getListener() { + return listener; + } + // Since BigQueryReadClient client's methods are final they cannot be mocked with Mockito for // testing // So this wrapper method can be mocked in tests, instead. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java index 436a00a6b779..cfd670e84e7c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java @@ -238,7 +238,11 @@ public synchronized boolean advance() throws IOException { private synchronized boolean readNextRecord() throws IOException { Iterator responseIterator = this.responseIterator; while (reader.readyForNextReadResponse()) { - if (!responseIterator.hasNext()) { + // hasNext call has internal retry. Record throttling metrics after called + boolean hasNext = responseIterator.hasNext(); + storageClient.reportPendingMetrics(); + + if (!hasNext) { fractionConsumed = 1d; return false; } @@ -385,6 +389,7 @@ public synchronized BigQueryStorageStreamSource getCurrentSource() { // the SplitReadStream validation logic depends. Removing it will cause incorrect // split operations to succeed. newResponseIterator.hasNext(); + storageClient.reportPendingMetrics(); } catch (FailedPreconditionException e) { // The current source has already moved past the split point, so this split attempt // is unsuccessful. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index eb4cff1a5969..5fac8f1a1b4e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -2027,9 +2027,11 @@ public Object getTransportCode() { } @Test - public void testRetryAttemptCounter() { - BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter = - new BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter(); + public void testRetryAttemptCounter() throws IOException { + BigQueryServicesImpl.StorageClientImpl impl = + new BigQueryServicesImpl.StorageClientImpl( + PipelineOptionsFactory.create().as(BigQueryOptions.class)); + BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter = impl.getListener(); RetryInfo retryInfo = RetryInfo.newBuilder() @@ -2071,19 +2073,23 @@ public RetryInfo parseBytes(byte[] serialized) { // Nulls don't bump the counter. counter.onRetryAttempt(null, null); + impl.reportPendingMetrics(); assertEquals(0, (long) container.getCounter(metricName).getCumulative()); // Resource exhausted with empty metadata doesn't bump the counter. counter.onRetryAttempt( Status.RESOURCE_EXHAUSTED.withDescription("You have consumed some quota"), new Metadata()); + impl.reportPendingMetrics(); assertEquals(0, (long) container.getCounter(metricName).getCumulative()); // Resource exhausted with retry info bumps the counter. counter.onRetryAttempt(Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata); + impl.reportPendingMetrics(); assertEquals(123456, (long) container.getCounter(metricName).getCumulative()); // Other errors with retry info doesn't bump the counter. counter.onRetryAttempt(Status.UNAVAILABLE.withDescription("Server is gone"), metadata); + impl.reportPendingMetrics(); assertEquals(123456, (long) container.getCounter(metricName).getCumulative()); } }