-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Implement enumerator metrics for pending splits, pending recor… #9524
Conversation
@@ -115,4 +115,7 @@ default void onCompletedSplits(Collection<String> completedSplitIds) {} | |||
* snapshots and splits, which defeats the purpose of throttling. | |||
*/ | |||
int pendingSplitCount(); | |||
|
|||
/** Pending records count */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How good is the split.task().estimatedRowsCount())
which we are using for the calculations? Shall we state in the comments, or in the method name, that the value is estimated? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't worry too much. the estimated
part comes from a split large file. there is no record count for each split/chunk. hence the record count is estimated based on ratio of the split bytes / file bytes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that the method name should contain estimated
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think of it as more an implementation detail whether it is estimated or not--perhaps there could be a more accurate way of computing this value in the future. Rather than leave the comment in the interface, are you ok if we just leave the comment on the implementing method?
We can leave more hints of how to use this method in the interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a public facing interface.
What about:
/** Pending records count. Could be an estimation if exact numbersare not available*/
I am not comfortable stating one thing in the comments/docs and doing other things in the implementation.
@@ -50,4 +51,18 @@ public static MiniClusterWithClientResource createWithClassloaderCheckDisabled() | |||
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) | |||
.build()); | |||
} | |||
|
|||
public static MiniClusterWithClientResource createWithClassloaderCheckDisabled( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an ongoing process to move from junit4 to junit5 tests. It would be good to add new features to the junit5 tests only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case, there is no new test class added. maybe junit5 should be handled separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned that we are adding new feature to the old testing harness here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments below
@ClassRule | ||
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = | ||
MiniClusterResource.createWithClassloaderCheckDisabled(); | ||
MiniClusterResource.createWithClassloaderCheckDisabled(METRIC_REPORTER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use junit5 tests for testing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would require refactoring this whole class. I can refactor it but that is outside the scope of this PR. Otherwise, I can separate out the metric testing to a new class and use junit5 there.
It's just that these metrics rely on similar logic as continuous iceberg source testing, as to not duplicate tests/code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving the existing test to junit5 in another PR, and rebasing this one above that one.
@nastra: Is there any ongoing work to move this test to junit5?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't seen a PR that would include migrating this class here to JUnit5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary @stevenzwu @nastra I'm fine with that and I can volunteer to migrate this class to JUnit5 after the PR is merged. I am limited in my bandwidth this week, so I can address it next week, while I would like these metrics to land in the upcoming iceberg release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @mas-chen that the JUnit5 can be tackled as a separate PR. this PR doesn't add any new test classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mas-chen: It seems more complicated this way (modify, migrate, remove), but I am mostly interested in the final result 😀
@@ -367,6 +382,8 @@ public void testSpecificSnapshotTimestamp() throws Exception { | |||
|
|||
List<Row> result3 = waitForResult(iter, 2); | |||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | |||
|
|||
assertThatIcebergEnumeratorMetricsExist(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want tests for asserting the metrics values too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is probably difficult to reliably assert on the values of unassginedSplits
and pendingRecords
due to timing, unless we can add a listener to the metric reporter to track all value changes for a metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might be able to wait for the expected metrics values, like we do it here:
Line 338 in 13d2160
Awaitility.await() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
polling won't work because the value may have changed from 0 to 1 and back to 0 within a polling interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah polling is a challenge. I can do fine-grained unit tests to have better control on when it is invoked. However, I don't think it is possible to assert on a distinct value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Thanks for the explanation!
* ElapsedTimeGauge#refreshLastRecordedTime()}. | ||
*/ | ||
@Internal | ||
public class ElapsedTimeGauge implements Gauge<Long> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe move to util
package?
@@ -115,4 +115,7 @@ default void onCompletedSplits(Collection<String> completedSplitIds) {} | |||
* snapshots and splits, which defeats the purpose of throttling. | |||
*/ | |||
int pendingSplitCount(); | |||
|
|||
/** Pending records count */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I won't worry too much. the estimated
part comes from a split large file. there is no record count for each split/chunk. hence the record count is estimated based on ratio of the split bytes / file bytes.
@@ -50,4 +51,18 @@ public static MiniClusterWithClientResource createWithClassloaderCheckDisabled() | |||
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG) | |||
.build()); | |||
} | |||
|
|||
public static MiniClusterWithClientResource createWithClassloaderCheckDisabled( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case, there is no new test class added. maybe junit5 should be handled separately?
private static void assertThatIcebergEnumeratorMetricsExist() { | ||
assertThatIcebergSourceMetricExists( | ||
"enumerator", "coordinator.enumerator.elapsedSecondsSinceLastSplitDiscovery"); | ||
assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.unassignedSplits"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about pendingRecords
?
@@ -367,6 +382,8 @@ public void testSpecificSnapshotTimestamp() throws Exception { | |||
|
|||
List<Row> result3 = waitForResult(iter, 2); | |||
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema()); | |||
|
|||
assertThatIcebergEnumeratorMetricsExist(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is probably difficult to reliably assert on the values of unassginedSplits
and pendingRecords
due to timing, unless we can add a listener to the metric reporter to track all value changes for a metric.
4c6c7f1
to
0c8da06
Compare
…ds, and split discovery
0c8da06
to
341f9f4
Compare
@mas-chen: Please backport this to 1.17, 1.18 and continue with the Junit5 migration PR. |
@pvary this is only supported by 1.18 and a 1.17 impl would cause a runtime error. As mentioned in the PR description, I don't think backporting is necessary here |
…ds, and split discovery (apache#9524)
…ds, and split discovery (apache#9524)
…ds, and split discovery
Enumerator metrics are now supported in Flink 1.18. NOTE: this will not be backported.