From e5dc5ec9675950ae763b1c5813c2e9e8daa2769a Mon Sep 17 00:00:00 2001 From: Mason Chen Date: Tue, 30 Jan 2024 00:22:37 -0800 Subject: [PATCH] Flink: Implement enumerator metrics for pending splits, pending records, and split discovery (#9524) --- .../sink/IcebergFilesCommitterMetrics.java | 25 +--------- .../source/assigner/DefaultSplitAssigner.java | 7 +++ .../flink/source/assigner/SplitAssigner.java | 6 +++ .../enumerator/AbstractIcebergEnumerator.java | 10 ++-- .../ContinuousIcebergEnumerator.java | 9 ++++ .../iceberg/flink/util/ElapsedTimeGauge.java | 47 +++++++++++++++++++ .../iceberg/flink/MiniClusterResource.java | 15 ++++++ .../source/TestIcebergSourceContinuous.java | 37 ++++++++++++++- 8 files changed, 127 insertions(+), 29 deletions(-) create mode 100644 flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java index 9de0d6aaa551..5b28c4acb1c5 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java @@ -21,8 +21,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.iceberg.flink.util.ElapsedTimeGauge; class IcebergFilesCommitterMetrics { private final AtomicLong lastCheckpointDurationMs = new AtomicLong(); @@ -70,27 +70,4 @@ void updateCommitSummary(CommitSummary stats) { committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount()); committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount()); } - - /** - * This gauge measures the elapsed time between now and last recorded time set by {@link - * ElapsedTimeGauge#refreshLastRecordedTime()}. - */ - private static class ElapsedTimeGauge implements Gauge { - private final TimeUnit reportUnit; - private volatile long lastRecordedTimeNano; - - ElapsedTimeGauge(TimeUnit timeUnit) { - this.reportUnit = timeUnit; - this.lastRecordedTimeNano = System.nanoTime(); - } - - void refreshLastRecordedTime() { - this.lastRecordedTimeNano = System.nanoTime(); - } - - @Override - public Long getValue() { - return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS); - } - } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java index 37a0f1a6055f..e7447d08c985 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java @@ -103,6 +103,13 @@ public synchronized int pendingSplitCount() { return pendingSplits.size(); } + @Override + public long pendingRecords() { + return pendingSplits.stream() + .map(split -> split.task().estimatedRowsCount()) + .reduce(0L, Long::sum); + } + private synchronized void completeAvailableFuturesIfNeeded() { if (availableFuture != null && !pendingSplits.isEmpty()) { availableFuture.complete(null); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java index ca60612f0ec9..dae7c8cca70c 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java @@ -115,4 +115,10 @@ default void onCompletedSplits(Collection completedSplitIds) {} * snapshots and splits, which defeats the purpose of throttling. */ int pendingSplitCount(); + + /** + * Return the number of pending records, which can act as a measure of the source lag. This value + * could be an estimation if the exact number of records cannot be accurately computed. + */ + long pendingRecords(); } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java index 3aca390755ed..6c9a855bc149 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java @@ -36,10 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * TODO: publish enumerator monitor metrics like number of pending metrics after FLINK-21000 is - * resolved - */ abstract class AbstractIcebergEnumerator implements SplitEnumerator { private static final Logger LOG = LoggerFactory.getLogger(AbstractIcebergEnumerator.class); @@ -55,6 +51,12 @@ abstract class AbstractIcebergEnumerator this.assigner = assigner; this.readersAwaitingSplit = new LinkedHashMap<>(); this.availableFuture = new AtomicReference<>(); + this.enumeratorContext + .metricGroup() + // This number may not capture the entire backlog due to split discovery throttling to avoid + // excessive memory footprint. Some pending splits may not have been discovered yet. + .setUnassignedSplitsGauge(() -> Long.valueOf(assigner.pendingSplitCount())); + this.enumeratorContext.metricGroup().gauge("pendingRecords", assigner::pendingRecords); } @Override diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java index b1dadfb9a69a..ff68103b2b9a 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Objects; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; @@ -28,6 +29,7 @@ import org.apache.iceberg.flink.source.ScanContext; import org.apache.iceberg.flink.source.assigner.SplitAssigner; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.util.ElapsedTimeGauge; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +60,8 @@ public class ContinuousIcebergEnumerator extends AbstractIcebergEnumerator { /** Count the consecutive failures and throw exception if the max allowed failres are reached */ private transient int consecutiveFailures = 0; + private final ElapsedTimeGauge elapsedSecondsSinceLastSplitDiscovery; + public ContinuousIcebergEnumerator( SplitEnumeratorContext enumeratorContext, SplitAssigner assigner, @@ -72,6 +76,10 @@ public ContinuousIcebergEnumerator( this.splitPlanner = splitPlanner; this.enumeratorPosition = new AtomicReference<>(); this.enumerationHistory = new EnumerationHistory(ENUMERATION_SPLIT_COUNT_HISTORY_SIZE); + this.elapsedSecondsSinceLastSplitDiscovery = new ElapsedTimeGauge(TimeUnit.SECONDS); + this.enumeratorContext + .metricGroup() + .gauge("elapsedSecondsSinceLastSplitDiscovery", elapsedSecondsSinceLastSplitDiscovery); if (enumState != null) { this.enumeratorPosition.set(enumState.lastEnumeratedPosition()); @@ -139,6 +147,7 @@ private void processDiscoveredSplits(ContinuousEnumerationResult result, Throwab enumeratorPosition.get(), result.fromPosition()); } else { + elapsedSecondsSinceLastSplitDiscovery.refreshLastRecordedTime(); // Sometimes, enumeration may yield no splits for a few reasons. // - upstream paused or delayed streaming writes to the Iceberg table. // - enumeration frequency is higher than the upstream write frequency. diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java new file mode 100644 index 000000000000..6306e82d5729 --- /dev/null +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Gauge; + +/** + * This gauge measures the elapsed time between now and last recorded time set by {@link + * ElapsedTimeGauge#refreshLastRecordedTime()}. + */ +@Internal +public class ElapsedTimeGauge implements Gauge { + private final TimeUnit reportUnit; + private volatile long lastRecordedTimeNano; + + public ElapsedTimeGauge(TimeUnit timeUnit) { + this.reportUnit = timeUnit; + refreshLastRecordedTime(); + } + + public void refreshLastRecordedTime() { + this.lastRecordedTimeNano = System.nanoTime(); + } + + @Override + public Long getValue() { + return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS); + } +} diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java index 45af9241b743..399d7aaff64c 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java @@ -20,6 +20,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -50,4 +51,18 @@ public static MiniClusterWithClientResource createWithClassloaderCheckDisabled() .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) .build()); } + + public static MiniClusterWithClientResource createWithClassloaderCheckDisabled( + InMemoryReporter inMemoryReporter) { + Configuration configuration = + new Configuration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG); + inMemoryReporter.addToConfiguration(configuration); + + return new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(MiniClusterResource.DEFAULT_TM_NUM) + .setNumberSlotsPerTaskManager(MiniClusterResource.DEFAULT_PARALLELISM) + .setConfiguration(configuration) + .build()); + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java index bfd7fa5758e3..61e05e99e14f 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import org.apache.flink.api.common.JobID; @@ -30,7 +31,9 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; @@ -58,9 +61,11 @@ public class TestIcebergSourceContinuous { + public static final InMemoryReporter METRIC_REPORTER = InMemoryReporter.create(); + @ClassRule public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = - MiniClusterResource.createWithClassloaderCheckDisabled(); + MiniClusterResource.createWithClassloaderCheckDisabled(METRIC_REPORTER); @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @@ -112,6 +117,8 @@ public void testTableScanThenIncremental() throws Exception { List result3 = waitForResult(iter, 2); TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + assertThatIcebergEnumeratorMetricsExist(); } } @@ -162,6 +169,8 @@ public void testTableScanThenIncrementalAfterExpiration() throws Exception { List result3 = waitForResult(iter, 2); TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + assertThatIcebergEnumeratorMetricsExist(); } } @@ -211,6 +220,8 @@ public void testEarliestSnapshot() throws Exception { List result3 = waitForResult(iter, 2); TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + assertThatIcebergEnumeratorMetricsExist(); } } @@ -263,6 +274,8 @@ public void testLatestSnapshot() throws Exception { List result3 = waitForResult(iter, 2); TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + assertThatIcebergEnumeratorMetricsExist(); } } @@ -313,6 +326,8 @@ public void testSpecificSnapshotId() throws Exception { List result3 = waitForResult(iter, 2); TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + assertThatIcebergEnumeratorMetricsExist(); } } @@ -367,6 +382,8 @@ public void testSpecificSnapshotTimestamp() throws Exception { List result3 = waitForResult(iter, 2); TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); + + assertThatIcebergEnumeratorMetricsExist(); } } @@ -505,4 +522,22 @@ public static List getRunningJobs(ClusterClient client) throws Excepti .map(JobStatusMessage::getJobId) .collect(Collectors.toList()); } + + private static void assertThatIcebergEnumeratorMetricsExist() { + assertThatIcebergSourceMetricExists( + "enumerator", "coordinator.enumerator.elapsedSecondsSinceLastSplitDiscovery"); + assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.unassignedSplits"); + assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.pendingRecords"); + } + + private static void assertThatIcebergSourceMetricExists( + String metricGroupPattern, String metricName) { + Optional groups = METRIC_REPORTER.findGroup(metricGroupPattern); + assertThat(groups).isPresent(); + assertThat( + METRIC_REPORTER.getMetricsByGroup(groups.get()).keySet().stream() + .map(name -> groups.get().getMetricIdentifier(name))) + .satisfiesOnlyOnce( + fullMetricName -> assertThat(fullMetricName).containsSubsequence(metricName)); + } }