diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java index a8e1adf643ab..374288de6562 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java @@ -301,6 +301,14 @@ interface StorageClient extends AutoCloseable { /* This method variant collects request count metric, using the fullTableID metadata. */ SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request, String fullTableId); + /** + * Call this method on Work Item thread to report outstanding metrics. + * + *
Because incrementing metrics is only supported on the execution thread, callback thread
+ * that has pending metrics cannot report it directly.
+ */
+ default void reportPendingMetrics() {}
+
/**
* Close the client object.
*
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index fa680004a7cb..6a0f5398e86e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -1653,13 +1653,30 @@ public void cancel() {
static class StorageClientImpl implements StorageClient {
+ public final Counter throttlingMsecs =
+ Metrics.counter(StorageClientImpl.class, "throttling-msecs");
+
+ private transient long unreportedDelay = 0L;
+
+ private void addToPendingMetrics(long delay) {
+ unreportedDelay += delay;
+ }
+
+ @Override
+ public void reportPendingMetrics() {
+ long delay = unreportedDelay;
+ unreportedDelay = 0L;
+
+ if (delay > 0) {
+ throttlingMsecs.inc(delay);
+ }
+ }
+
// If client retries ReadRows requests due to RESOURCE_EXHAUSTED error, bump
// throttlingMsecs according to delay. Runtime can use this information for
// autoscaling decisions.
@VisibleForTesting
- public static class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {
- public final Counter throttlingMsecs =
- Metrics.counter(StorageClientImpl.class, "throttling-msecs");
+ class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {
@SuppressWarnings("ProtoDurationGetSecondsGetNano")
@Override
@@ -1673,7 +1690,7 @@ public void onRetryAttempt(Status status, Metadata metadata) {
long delay =
retryInfo.getRetryDelay().getSeconds() * 1000
+ retryInfo.getRetryDelay().getNanos() / 1000000;
- throttlingMsecs.inc(delay);
+ addToPendingMetrics(delay);
}
}
}
@@ -1685,7 +1702,11 @@ public void onRetryAttempt(Status status, Metadata metadata) {
private final BigQueryReadClient client;
- private StorageClientImpl(BigQueryOptions options) throws IOException {
+ private final RetryAttemptCounter listener;
+
+ @VisibleForTesting
+ StorageClientImpl(BigQueryOptions options) throws IOException {
+ listener = new RetryAttemptCounter();
BigQueryReadSettings.Builder settingsBuilder =
BigQueryReadSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(options.getGcpCredential()))
@@ -1693,7 +1714,7 @@ private StorageClientImpl(BigQueryOptions options) throws IOException {
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build())
- .setReadRowsRetryAttemptListener(new RetryAttemptCounter());
+ .setReadRowsRetryAttemptListener(listener);
UnaryCallSettings.Builder