-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Implement enumerator metrics for pending splits, pending recor… #9524
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.flink.util; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.metrics.Gauge; | ||
|
||
/** | ||
* This gauge measures the elapsed time between now and last recorded time set by {@link | ||
* ElapsedTimeGauge#refreshLastRecordedTime()}. | ||
*/ | ||
@Internal | ||
public class ElapsedTimeGauge implements Gauge<Long> { | ||
private final TimeUnit reportUnit; | ||
private volatile long lastRecordedTimeNano; | ||
|
||
public ElapsedTimeGauge(TimeUnit timeUnit) { | ||
this.reportUnit = timeUnit; | ||
refreshLastRecordedTime(); | ||
} | ||
|
||
public void refreshLastRecordedTime() { | ||
this.lastRecordedTimeNano = System.nanoTime(); | ||
} | ||
|
||
@Override | ||
public Long getValue() { | ||
return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, TimeUnit.NANOSECONDS); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -23,14 +23,17 @@ | |||
import java.time.Duration; | ||||
import java.util.Collection; | ||||
import java.util.List; | ||||
import java.util.Optional; | ||||
import java.util.concurrent.atomic.AtomicLong; | ||||
import java.util.stream.Collectors; | ||||
import org.apache.flink.api.common.JobID; | ||||
import org.apache.flink.api.common.JobStatus; | ||||
import org.apache.flink.api.common.eventtime.WatermarkStrategy; | ||||
import org.apache.flink.api.common.typeinfo.TypeInformation; | ||||
import org.apache.flink.client.program.ClusterClient; | ||||
import org.apache.flink.metrics.MetricGroup; | ||||
import org.apache.flink.runtime.client.JobStatusMessage; | ||||
import org.apache.flink.runtime.testutils.InMemoryReporter; | ||||
import org.apache.flink.streaming.api.datastream.DataStream; | ||||
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | ||||
import org.apache.flink.table.data.RowData; | ||||
|
@@ -58,9 +61,11 @@ | |||
|
||||
public class TestIcebergSourceContinuous { | ||||
|
||||
public static final InMemoryReporter METRIC_REPORTER = InMemoryReporter.create(); | ||||
|
||||
@ClassRule | ||||
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = | ||||
MiniClusterResource.createWithClassloaderCheckDisabled(); | ||||
MiniClusterResource.createWithClassloaderCheckDisabled(METRIC_REPORTER); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use junit5 tests for testing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That would require refactoring this whole class. I can refactor it but that is outside the scope of this PR. Otherwise, I can separate out the metric testing to a new class and use junit5 there. It's just that these metrics rely on similar logic as continuous iceberg source testing, as to not duplicate tests/code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about moving the existing test to junit5 in another PR, and rebasing this one above that one. @nastra: Is there any ongoing work to move this test to junit5? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't seen a PR that would include migrating this class here to JUnit5 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pvary @stevenzwu @nastra I'm fine with that and I can volunteer to migrate this class to JUnit5 after the PR is merged. I am limited in my bandwidth this week, so I can address it next week, while I would like these metrics to land in the upcoming iceberg release. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @mas-chen that the JUnit5 can be tackled as a separate PR. this PR doesn't add any new test classes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mas-chen: It seems more complicated this way (modify, migrate, remove), but I am mostly interested in the final result 😀 |
||||
|
||||
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); | ||||
|
||||
|
@@ -112,6 +117,8 @@ public void testTableScanThenIncremental() throws Exception { | |||
|
||||
List<Row> result3 = waitForResult(iter, 2); | ||||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | ||||
|
||||
assertThatIcebergEnumeratorMetricsExist(); | ||||
} | ||||
} | ||||
|
||||
|
@@ -162,6 +169,8 @@ public void testTableScanThenIncrementalAfterExpiration() throws Exception { | |||
|
||||
List<Row> result3 = waitForResult(iter, 2); | ||||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | ||||
|
||||
assertThatIcebergEnumeratorMetricsExist(); | ||||
} | ||||
} | ||||
|
||||
|
@@ -211,6 +220,8 @@ public void testEarliestSnapshot() throws Exception { | |||
|
||||
List<Row> result3 = waitForResult(iter, 2); | ||||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | ||||
|
||||
assertThatIcebergEnumeratorMetricsExist(); | ||||
} | ||||
} | ||||
|
||||
|
@@ -263,6 +274,8 @@ public void testLatestSnapshot() throws Exception { | |||
|
||||
List<Row> result3 = waitForResult(iter, 2); | ||||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | ||||
|
||||
assertThatIcebergEnumeratorMetricsExist(); | ||||
} | ||||
} | ||||
|
||||
|
@@ -313,6 +326,8 @@ public void testSpecificSnapshotId() throws Exception { | |||
|
||||
List<Row> result3 = waitForResult(iter, 2); | ||||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | ||||
|
||||
assertThatIcebergEnumeratorMetricsExist(); | ||||
} | ||||
} | ||||
|
||||
|
@@ -367,6 +382,8 @@ public void testSpecificSnapshotTimestamp() throws Exception { | |||
|
||||
List<Row> result3 = waitForResult(iter, 2); | ||||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | ||||
|
||||
assertThatIcebergEnumeratorMetricsExist(); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want tests for asserting the metrics values too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it is probably difficult to reliably assert on the values of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might be able to wait for the expected metrics values, like we do it here: Line 338 in 13d2160
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. polling won't work because the value may have changed from 0 to 1 and back to 0 within a polling interval. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah polling is a challenge. I can do fine-grained unit tests to have better control on when it is invoked. However, I don't think it is possible to assert on a distinct value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense. Thanks for the explanation! |
||||
} | ||||
} | ||||
|
||||
|
@@ -418,4 +435,22 @@ public static List<JobID> getRunningJobs(ClusterClient<?> client) throws Excepti | |||
.map(JobStatusMessage::getJobId) | ||||
.collect(Collectors.toList()); | ||||
} | ||||
|
||||
private static void assertThatIcebergEnumeratorMetricsExist() { | ||||
assertThatIcebergSourceMetricExists( | ||||
"enumerator", "coordinator.enumerator.elapsedSecondsSinceLastSplitDiscovery"); | ||||
assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.unassignedSplits"); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what about |
||||
assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.pendingRecords"); | ||||
} | ||||
|
||||
private static void assertThatIcebergSourceMetricExists( | ||||
String metricGroupPattern, String metricName) { | ||||
Optional<MetricGroup> groups = METRIC_REPORTER.findGroup(metricGroupPattern); | ||||
assertThat(groups).isPresent(); | ||||
assertThat( | ||||
METRIC_REPORTER.getMetricsByGroup(groups.get()).keySet().stream() | ||||
.map(name -> groups.get().getMetricIdentifier(name))) | ||||
.satisfiesOnlyOnce( | ||||
fullMetricName -> assertThat(fullMetricName).containsSubsequence(metricName)); | ||||
} | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an ongoing process to move from junit4 to junit5 tests. It would be good to add new features to the junit5 tests only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case, there is no new test class added. maybe junit5 should be handled separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned that we are adding new feature to the old testing harness here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments below