Skip to content

Commit

Permalink
Fix reporting metrics not supported warning for BigQueryIO Direct read (
Browse files Browse the repository at this point in the history
apache#31096)

When throttled. This fixes throttling based autoscaling for BigQueryIO
DIRECT_READ pipeline on Dataflow
  • Loading branch information
Abacn authored Apr 26, 2024
1 parent 3e5a658 commit c20e329
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ interface StorageClient extends AutoCloseable {
/* This method variant collects request count metric, using the fullTableID metadata. */
SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request, String fullTableId);

/**
* Call this method on Work Item thread to report outstanding metrics.
*
* <p>Because incrementing metrics is only supported on the execution thread, callback thread
* that has pending metrics cannot report it directly.
*/
default void reportPendingMetrics() {}

/**
* Close the client object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1653,13 +1653,30 @@ public void cancel() {

static class StorageClientImpl implements StorageClient {

public final Counter throttlingMsecs =
Metrics.counter(StorageClientImpl.class, "throttling-msecs");

private transient long unreportedDelay = 0L;

private void addToPendingMetrics(long delay) {
unreportedDelay += delay;
}

@Override
public void reportPendingMetrics() {
long delay = unreportedDelay;
unreportedDelay = 0L;

if (delay > 0) {
throttlingMsecs.inc(delay);
}
}

// If client retries ReadRows requests due to RESOURCE_EXHAUSTED error, bump
// throttlingMsecs according to delay. Runtime can use this information for
// autoscaling decisions.
@VisibleForTesting
public static class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {
public final Counter throttlingMsecs =
Metrics.counter(StorageClientImpl.class, "throttling-msecs");
class RetryAttemptCounter implements BigQueryReadSettings.RetryAttemptListener {

@SuppressWarnings("ProtoDurationGetSecondsGetNano")
@Override
Expand All @@ -1673,7 +1690,7 @@ public void onRetryAttempt(Status status, Metadata metadata) {
long delay =
retryInfo.getRetryDelay().getSeconds() * 1000
+ retryInfo.getRetryDelay().getNanos() / 1000000;
throttlingMsecs.inc(delay);
addToPendingMetrics(delay);
}
}
}
Expand All @@ -1685,15 +1702,19 @@ public void onRetryAttempt(Status status, Metadata metadata) {

private final BigQueryReadClient client;

private StorageClientImpl(BigQueryOptions options) throws IOException {
private final RetryAttemptCounter listener;

@VisibleForTesting
StorageClientImpl(BigQueryOptions options) throws IOException {
listener = new RetryAttemptCounter();
BigQueryReadSettings.Builder settingsBuilder =
BigQueryReadSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(options.getGcpCredential()))
.setTransportChannelProvider(
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build())
.setReadRowsRetryAttemptListener(new RetryAttemptCounter());
.setReadRowsRetryAttemptListener(listener);

UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession> createReadSessionSettings =
settingsBuilder.getStubSettingsBuilder().createReadSessionSettings();
Expand Down Expand Up @@ -1723,6 +1744,11 @@ private StorageClientImpl(BigQueryOptions options) throws IOException {
this.client = BigQueryReadClient.create(settingsBuilder.build());
}

@VisibleForTesting
RetryAttemptCounter getListener() {
return listener;
}

// Since BigQueryReadClient client's methods are final they cannot be mocked with Mockito for
// testing
// So this wrapper method can be mocked in tests, instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ public synchronized boolean advance() throws IOException {
private synchronized boolean readNextRecord() throws IOException {
Iterator<ReadRowsResponse> responseIterator = this.responseIterator;
while (reader.readyForNextReadResponse()) {
if (!responseIterator.hasNext()) {
// hasNext call has internal retry. Record throttling metrics after called
boolean hasNext = responseIterator.hasNext();
storageClient.reportPendingMetrics();

if (!hasNext) {
fractionConsumed = 1d;
return false;
}
Expand Down Expand Up @@ -385,6 +389,7 @@ public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
// the SplitReadStream validation logic depends. Removing it will cause incorrect
// split operations to succeed.
newResponseIterator.hasNext();
storageClient.reportPendingMetrics();
} catch (FailedPreconditionException e) {
// The current source has already moved past the split point, so this split attempt
// is unsuccessful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2027,9 +2027,11 @@ public Object getTransportCode() {
}

@Test
public void testRetryAttemptCounter() {
BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter =
new BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter();
public void testRetryAttemptCounter() throws IOException {
BigQueryServicesImpl.StorageClientImpl impl =
new BigQueryServicesImpl.StorageClientImpl(
PipelineOptionsFactory.create().as(BigQueryOptions.class));
BigQueryServicesImpl.StorageClientImpl.RetryAttemptCounter counter = impl.getListener();

RetryInfo retryInfo =
RetryInfo.newBuilder()
Expand Down Expand Up @@ -2071,19 +2073,23 @@ public RetryInfo parseBytes(byte[] serialized) {

// Nulls don't bump the counter.
counter.onRetryAttempt(null, null);
impl.reportPendingMetrics();
assertEquals(0, (long) container.getCounter(metricName).getCumulative());

// Resource exhausted with empty metadata doesn't bump the counter.
counter.onRetryAttempt(
Status.RESOURCE_EXHAUSTED.withDescription("You have consumed some quota"), new Metadata());
impl.reportPendingMetrics();
assertEquals(0, (long) container.getCounter(metricName).getCumulative());

// Resource exhausted with retry info bumps the counter.
counter.onRetryAttempt(Status.RESOURCE_EXHAUSTED.withDescription("Stop for a while"), metadata);
impl.reportPendingMetrics();
assertEquals(123456, (long) container.getCounter(metricName).getCumulative());

// Other errors with retry info doesn't bump the counter.
counter.onRetryAttempt(Status.UNAVAILABLE.withDescription("Server is gone"), metadata);
impl.reportPendingMetrics();
assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
}
}

0 comments on commit c20e329

Please sign in to comment.