Skip to content

Commit

Permalink
Flink: Implement enumerator metrics for pending splits, pending recor…
Browse files Browse the repository at this point in the history
…ds, and split discovery (#9524)
  • Loading branch information
mas-chen authored Jan 30, 2024
1 parent 3be1939 commit e5dc5ec
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.iceberg.flink.util.ElapsedTimeGauge;

class IcebergFilesCommitterMetrics {
private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
Expand Down Expand Up @@ -70,27 +70,4 @@ void updateCommitSummary(CommitSummary stats) {
committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
}

/**
* This gauge measures the elapsed time between now and last recorded time set by {@link
* ElapsedTimeGauge#refreshLastRecordedTime()}.
*/
private static class ElapsedTimeGauge implements Gauge<Long> {
private final TimeUnit reportUnit;
private volatile long lastRecordedTimeNano;

ElapsedTimeGauge(TimeUnit timeUnit) {
this.reportUnit = timeUnit;
this.lastRecordedTimeNano = System.nanoTime();
}

void refreshLastRecordedTime() {
this.lastRecordedTimeNano = System.nanoTime();
}

@Override
public Long getValue() {
return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ public synchronized int pendingSplitCount() {
return pendingSplits.size();
}

@Override
public long pendingRecords() {
return pendingSplits.stream()
.map(split -> split.task().estimatedRowsCount())
.reduce(0L, Long::sum);
}

private synchronized void completeAvailableFuturesIfNeeded() {
if (availableFuture != null && !pendingSplits.isEmpty()) {
availableFuture.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,10 @@ default void onCompletedSplits(Collection<String> completedSplitIds) {}
* snapshots and splits, which defeats the purpose of throttling.
*/
int pendingSplitCount();

/**
* Return the number of pending records, which can act as a measure of the source lag. This value
* could be an estimation if the exact number of records cannot be accurately computed.
*/
long pendingRecords();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* TODO: publish enumerator monitor metrics like number of pending metrics after FLINK-21000 is
* resolved
*/
abstract class AbstractIcebergEnumerator
implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
Expand All @@ -55,6 +51,12 @@ abstract class AbstractIcebergEnumerator
this.assigner = assigner;
this.readersAwaitingSplit = new LinkedHashMap<>();
this.availableFuture = new AtomicReference<>();
this.enumeratorContext
.metricGroup()
// This number may not capture the entire backlog due to split discovery throttling to avoid
// excessive memory footprint. Some pending splits may not have been discovered yet.
.setUnassignedSplitsGauge(() -> Long.valueOf(assigner.pendingSplitCount()));
this.enumeratorContext.metricGroup().gauge("pendingRecords", assigner::pendingRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.util.ElapsedTimeGauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -58,6 +60,8 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator {
/** Count the consecutive failures and throw exception if the max allowed failres are reached */
private transient int consecutiveFailures = 0;

private final ElapsedTimeGauge elapsedSecondsSinceLastSplitDiscovery;

public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
Expand All @@ -72,6 +76,10 @@ public ContinuousIcebergEnumerator(
this.splitPlanner = splitPlanner;
this.enumeratorPosition = new AtomicReference<>();
this.enumerationHistory = new EnumerationHistory(ENUMERATION_SPLIT_COUNT_HISTORY_SIZE);
this.elapsedSecondsSinceLastSplitDiscovery = new ElapsedTimeGauge(TimeUnit.SECONDS);
this.enumeratorContext
.metricGroup()
.gauge("elapsedSecondsSinceLastSplitDiscovery", elapsedSecondsSinceLastSplitDiscovery);

if (enumState != null) {
this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
Expand Down Expand Up @@ -139,6 +147,7 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab
enumeratorPosition.get(),
result.fromPosition());
} else {
elapsedSecondsSinceLastSplitDiscovery.refreshLastRecordedTime();
// Sometimes, enumeration may yield no splits for a few reasons.
// - upstream paused or delayed streaming writes to the Iceberg table.
// - enumeration frequency is higher than the upstream write frequency.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.util;

import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Gauge;

/**
* This gauge measures the elapsed time between now and last recorded time set by {@link
* ElapsedTimeGauge#refreshLastRecordedTime()}.
*/
@Internal
public class ElapsedTimeGauge implements Gauge<Long> {
private final TimeUnit reportUnit;
private volatile long lastRecordedTimeNano;

public ElapsedTimeGauge(TimeUnit timeUnit) {
this.reportUnit = timeUnit;
refreshLastRecordedTime();
}

public void refreshLastRecordedTime() {
this.lastRecordedTimeNano = System.nanoTime();
}

@Override
public Long getValue() {
return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;

Expand Down Expand Up @@ -50,4 +51,18 @@ public static MiniClusterWithClientResource createWithClassloaderCheckDisabled()
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
}

public static MiniClusterWithClientResource createWithClassloaderCheckDisabled(
InMemoryReporter inMemoryReporter) {
Configuration configuration =
new Configuration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
inMemoryReporter.addToConfiguration(configuration);

return new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(MiniClusterResource.DEFAULT_TM_NUM)
.setNumberSlotsPerTaskManager(MiniClusterResource.DEFAULT_PARALLELISM)
.setConfiguration(configuration)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -58,9 +61,11 @@

public class TestIcebergSourceContinuous {

public static final InMemoryReporter METRIC_REPORTER = InMemoryReporter.create();

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();
MiniClusterResource.createWithClassloaderCheckDisabled(METRIC_REPORTER);

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

Expand Down Expand Up @@ -112,6 +117,8 @@ public void testTableScanThenIncremental() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
}
}

Expand Down Expand Up @@ -162,6 +169,8 @@ public void testTableScanThenIncrementalAfterExpiration() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
}
}

Expand Down Expand Up @@ -211,6 +220,8 @@ public void testEarliestSnapshot() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
}
}

Expand Down Expand Up @@ -263,6 +274,8 @@ public void testLatestSnapshot() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
}
}

Expand Down Expand Up @@ -313,6 +326,8 @@ public void testSpecificSnapshotId() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
}
}

Expand Down Expand Up @@ -367,6 +382,8 @@ public void testSpecificSnapshotTimestamp() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
}
}

Expand Down Expand Up @@ -505,4 +522,22 @@ public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Excepti
.map(JobStatusMessage::getJobId)
.collect(Collectors.toList());
}

private static void assertThatIcebergEnumeratorMetricsExist() {
assertThatIcebergSourceMetricExists(
"enumerator", "coordinator.enumerator.elapsedSecondsSinceLastSplitDiscovery");
assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.unassignedSplits");
assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.pendingRecords");
}

private static void assertThatIcebergSourceMetricExists(
String metricGroupPattern, String metricName) {
Optional<MetricGroup> groups = METRIC_REPORTER.findGroup(metricGroupPattern);
assertThat(groups).isPresent();
assertThat(
METRIC_REPORTER.getMetricsByGroup(groups.get()).keySet().stream()
.map(name -> groups.get().getMetricIdentifier(name)))
.satisfiesOnlyOnce(
fullMetricName -> assertThat(fullMetricName).containsSubsequence(metricName));
}
}

0 comments on commit e5dc5ec

Please sign in to comment.