Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Implement enumerator metrics for pending splits, pending recor… #9524

Merged
merged 1 commit into from
Jan 30, 2024

Conversation

mas-chen
Copy link
Contributor

@mas-chen mas-chen commented Jan 19, 2024

…ds, and split discovery

Enumerator metrics are now supported in Flink 1.18. NOTE: this will not be backported.

@github-actions github-actions bot added the flink label Jan 19, 2024
@@ -115,4 +115,7 @@ default void onCompletedSplits(Collection<String> completedSplitIds) {}
* snapshots and splits, which defeats the purpose of throttling.
*/
int pendingSplitCount();

/** Pending records count */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How good is the split.task().estimatedRowsCount()) which we are using for the calculations? Shall we state in the comments, or in the method name, that the value is estimated? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't worry too much. the estimated part comes from a split large file. there is no record count for each split/chunk. hence the record count is estimated based on ratio of the split bytes / file bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that the method name should contain estimated.

Copy link
Contributor Author

@mas-chen mas-chen Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think of it as more an implementation detail whether it is estimated or not--perhaps there could be a more accurate way of computing this value in the future. Rather than leave the comment in the interface, are you ok if we just leave the comment on the implementing method?

We can leave more hints of how to use this method in the interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public facing interface.
What about:

/** Pending records count. Could be an estimation if exact numbersare not available*/

I am not comfortable stating one thing in the comments/docs and doing other things in the implementation.

@@ -50,4 +51,18 @@ public static MiniClusterWithClientResource createWithClassloaderCheckDisabled()
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
}

public static MiniClusterWithClientResource createWithClassloaderCheckDisabled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an ongoing process to move from junit4 to junit5 tests. It would be good to add new features to the junit5 tests only

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, there is no new test class added. maybe junit5 should be handled separately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that we are adding new feature to the old testing harness here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments below

@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
MiniClusterResource.createWithClassloaderCheckDisabled();
MiniClusterResource.createWithClassloaderCheckDisabled(METRIC_REPORTER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use junit5 tests for testing?

Copy link
Contributor Author

@mas-chen mas-chen Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would require refactoring this whole class. I can refactor it but that is outside the scope of this PR. Otherwise, I can separate out the metric testing to a new class and use junit5 there.

It's just that these metrics rely on similar logic as continuous iceberg source testing, as to not duplicate tests/code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving the existing test to junit5 in another PR, and rebasing this one above that one.

@nastra: Is there any ongoing work to move this test to junit5?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen a PR that would include migrating this class here to JUnit5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary @stevenzwu @nastra I'm fine with that and I can volunteer to migrate this class to JUnit5 after the PR is merged. I am limited in my bandwidth this week, so I can address it next week, while I would like these metrics to land in the upcoming iceberg release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @mas-chen that the JUnit5 can be tackled as a separate PR. this PR doesn't add any new test classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mas-chen: It seems more complicated this way (modify, migrate, remove), but I am mostly interested in the final result 😀

@@ -367,6 +382,8 @@ public void testSpecificSnapshotTimestamp() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want tests for asserting the metrics values too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is probably difficult to reliably assert on the values of unassginedSplits and pendingRecords due to timing, unless we can add a listener to the metric reporter to track all value changes for a metric.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to wait for the expected metrics values, like we do it here:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

polling won't work because the value may have changed from 0 to 1 and back to 0 within a polling interval.

Copy link
Contributor Author

@mas-chen mas-chen Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah polling is a challenge. I can do fine-grained unit tests to have better control on when it is invoked. However, I don't think it is possible to assert on a distinct value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks for the explanation!

* ElapsedTimeGauge#refreshLastRecordedTime()}.
*/
@Internal
public class ElapsedTimeGauge implements Gauge<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe move to util package?

@@ -115,4 +115,7 @@ default void onCompletedSplits(Collection<String> completedSplitIds) {}
* snapshots and splits, which defeats the purpose of throttling.
*/
int pendingSplitCount();

/** Pending records count */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't worry too much. the estimated part comes from a split large file. there is no record count for each split/chunk. hence the record count is estimated based on ratio of the split bytes / file bytes.

@@ -50,4 +51,18 @@ public static MiniClusterWithClientResource createWithClassloaderCheckDisabled()
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
}

public static MiniClusterWithClientResource createWithClassloaderCheckDisabled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, there is no new test class added. maybe junit5 should be handled separately?

private static void assertThatIcebergEnumeratorMetricsExist() {
assertThatIcebergSourceMetricExists(
"enumerator", "coordinator.enumerator.elapsedSecondsSinceLastSplitDiscovery");
assertThatIcebergSourceMetricExists("enumerator", "coordinator.enumerator.unassignedSplits");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about pendingRecords?

@@ -367,6 +382,8 @@ public void testSpecificSnapshotTimestamp() throws Exception {

List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3, tableResource.table().schema());

assertThatIcebergEnumeratorMetricsExist();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is probably difficult to reliably assert on the values of unassginedSplits and pendingRecords due to timing, unless we can add a listener to the metric reporter to track all value changes for a metric.

@pvary pvary merged commit e5dc5ec into apache:main Jan 30, 2024
13 checks passed
@pvary
Copy link
Contributor

pvary commented Jan 30, 2024

@mas-chen: Please backport this to 1.17, 1.18 and continue with the Junit5 migration PR.
Thanks @mas-chen for the PR and @stevenzwu for the review!

@mas-chen
Copy link
Contributor Author

@pvary this is only supported by 1.18 and a 1.17 impl would cause a runtime error. As mentioned in the PR description, I don't think backporting is necessary here

adnanhemani pushed a commit to adnanhemani/iceberg that referenced this pull request Jan 30, 2024
@pvary
Copy link
Contributor

pvary commented Jan 31, 2024

@pvary this is only supported by 1.18 and a 1.17 impl would cause a runtime error. As mentioned in the PR description, I don't think backporting is necessary here

Got it.. Thanks @mas-chen!

devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants