-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Empty implementation for pauseOrResumeSplits to prevent UnsupportedOperationException #9308
Conversation
CC: @ajantha-bhat - further fixes on the test |
flink/v1.18/build.gradle
Outdated
@@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { | |||
// These artifacts are shaded and included in the orc-core fat jar | |||
exclude group: 'com.google.protobuf', module: 'protobuf-java' | |||
exclude group: 'org.apache.hive', module: 'hive-storage-api' | |||
exclude group: 'org.slf4j' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity: why was this needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We specify 2.x slf4j as an Iceberg dependency. But these dependencies carry a 1.x slf4j transitive dependency. When both 1.x and 2.x slf4j is on classpath the the loggers are not working, so it is very hard to follow what happens during the tests
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
...flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor comments, otherwise LGTM
@pvary is the whole PR addressing the flaky test? if not, please separate out the flaky test part (hopefully small) into a separate PR. I would like to merge the flaky test fix ASAP. And you can include all 3 versions (1.16, 1.17, 1.18) in one PR. |
As mentioned in the description, the missing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary we are changing some critical code here. I would prefer we take a little more time here and get more people to review.
Hence I would prefer if we disable/ignore the test first so that we make other PR/CI builds stable ASAP.
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
...k/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
Outdated
Show resolved
Hide resolved
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
That is why I created #9309. That contains the most important other fixes. |
@stevenzwu: Here is what I have found:
These messages are executed in the same thread as the fetch method, so in this case we have to return from the fetch loop, even with empty results. This is somewhat concerning to me considering the comment here: Lines 76 to 78 in 820fc3c
I do not see that it is closed, but probably you know more... |
I have checked, and the idleness is defined like this:
So if we have tasks (split manipulation/pause), then we are not idle, so the Fetcher is not removed. Also the Fetcher also handles the
|
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
// Unpause the reading of the split, and check the result size | ||
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split)); | ||
result = (ArrayBatchRecords) reader.fetch(); | ||
assertThat(result.numberOfRecords()).isEqualTo(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(result.numberOfRecords()).isEqualTo(3); | |
assertThat(result.numberOfRecords()).isGreaterThan(2); |
For test stability, I think fetch may not be guaranteed to return 1 record after pausing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reader reads the records in batches. The size of the batches are governed by:
public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_RECORD_COUNT =
ConfigOptions.key("table.exec.iceberg.fetch-batch-record-count")
.intType()
.defaultValue(2048)
.withDescription("The target number of records for Iceberg reader fetch batch.");
The default is 2048
, so all of the records are read here
...flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
// Unpause and read the data in th next fetch | ||
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split)); | ||
result = (ArrayBatchRecords) reader.fetch(); | ||
assertThat(result.numberOfRecords()).isEqualTo(3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto same comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will get all 3 records
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
BTW, should we add an integration test? |
That reminds me, to remove the The |
@@ -383,8 +381,10 @@ protected IcebergSource<RowData> source() { | |||
.project(TestFixtures.TS_SCHEMA) | |||
.splitSize(100L) | |||
.streaming(true) | |||
.monitorInterval(Duration.ofMillis(2)) | |||
.monitorInterval(Duration.ofMillis(100)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this, because the 2ms will causing too much load and ended up causing flakiness:
[collect-sink-operator-coordinator-executor-thread-pool-thread-1] WARN org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator - Collect sink coordinator encounters a CompletionException: java.net.SocketTimeoutException: Read timed out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2ms may cause high cpu usage. but why would it cause flakiness?
let's change it to 10
ms then. smaller value can discover new files sooner and make the test run faster. we already have a lot of integration tests using 10 ms and they are running fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I run the tests during the whole day yesterday, and found that the previous fix removed most of the flakiness, but we still have some issues which fails around 1000-2000 runs.
In failure cases the records are emitted by the emitter, but none of the are arriving to the Sink. I see the above error in the logs in this case: SocketTimeout
on the sink coordinator
, and retries - this appears multiple times before the timeout. Also the log is swamped with the planner logs.
With increasing the monitoring interval, I had 3500 successful runs in a row, when I decided that I need to spend my time elsewhere. As you can imagine this is nothing conclusive, and given the failure rate, nothing that could be tracked easily, but seems like a good improvement.
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
wakeUp.wait(1000); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
throw new RuntimeException("Interrupted while blocked on reading.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it correct to throw RuntimeException
? why don't we just propagate the InterruptedException
per the contract?
* @throws IOException when encountered IO errors, such as deserialization failures.
*/
RecordsWithSplitIds<E> fetch() throws IOException;
also the error msg is inaccurate. it is not blocked on reading
. it is paused due to watermark alignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that the InterruptedException
is not declared as throwable, so we can not throw it...
@@ -383,8 +381,10 @@ protected IcebergSource<RowData> source() { | |||
.project(TestFixtures.TS_SCHEMA) | |||
.splitSize(100L) | |||
.streaming(true) | |||
.monitorInterval(Duration.ofMillis(2)) | |||
.monitorInterval(Duration.ofMillis(100)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2ms may cause high cpu usage. but why would it cause flakiness?
let's change it to 10
ms then. smaller value can discover new files sooner and make the test run faster. we already have a lot of integration tests using 10 ms and they are running fine.
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
if (pausedSplits.contains(currentSplitId)) { | ||
// Wait until the reader is unblocked. Wake every second to catch any missed signal. | ||
// Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed. | ||
boolean first = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that the logic can be significantly simpler if we use unlimited wait()
. simply return an empty result upon wakeUp.wait()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am generally very-very cautious about using loop-less blocking calls.
Here is an example:
- WT (Watermark Thread) - Watermark arrives, and the decision is to block this reader
- WT -
wakeup
is called - ST (SplitReader Thread) - Current
fetch
finished - ST -
pauseOrResumeSplits
is called - ST - New
fetch
called - ST - New
fetch
is checking if the split is paused, and find that we need to block, but do not start to wait on the lock yet. - WT - New watermark arrives, and the decision is to unblock the reader
- WT -
wakeup
is called - ST - If the
fetch
arrives to the locking code now, then it will not receive thewakeup
call
The main issue is that the wakeUp
and the pauseOrResumeSplits
is not atomic, we need another flag to store if the given wakeUp
is handled, or not.
So i think we minimally need to have something like this:
if (pausedSplits.contains(currentSplitId)) {
LOG.info("Reader {} paused reading split {}", indexOfSubtask, currentSplitId);
synchronized (wakeUp) {
if (wakeUp.get()) {
wakeUp.set(false);
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
}
try {
wakeUp.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while blocked on reading.", e);
}
}
}
}
I think compared to this the added security of the while
loop is a good compromise.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need the while
loop, purely because there could be spurious wakeups and we may have to retry if wakeUp==false
. I think the code snippet above you posted @pvary is a good compromise if you replace the if
with a while
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm According to the API contract, it is ok to return empty result in case of spurious wakeups. that is what the current PR does already.
if (wakeUp.get()) {
wakeUp.set(false);
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary I am fine with while-loop and using wait timeout. I understand it is safer for avoiding potential deadlock.
The main issue is that the wakeUp and the pauseOrResumeSplits is not atomic, we need another flag to store if the given wakeUp is handled, or not.
I see the problem now from the timeline example you said earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code snippet above, and the one proposed by the PR is only different because I wanted to add logging to the blocking path, but do not want to swamp the log with a new message every second. If you think that the logging is not important, the decision is that we accept that we log a line every second on INFO level until the SplitReader is unblocked, then we can simplify the code. Or we accept that every fetch()
loop will create and new boolean variable which is not used in 90 percent of the cases.
On the hot path I prefer optimized code to simpler code, but the difference is probably not too noticeable anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from the timeline example above, it indicates a condition where the reader thread can be unnecessary blocked for 1s, which is not ideal. trying to think if there is anyway we can avoid this suboptimal bhavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeline is only valid if we do not use an extra flag to store the wakeUp
state and rely on a simple wait-notify
pattern.
I used this timeline to describe why I introduced the wakeUp
flag, and use synchronize around it in the PR.
I make sure that the wakeUp
flag is only read/set inside a synchronized
block, and we return from the fetch
while holding the synchronization lock. So if there is a concurrent wakeUp
method call, it will be blocked until we return, and we handle the next wakeup in the next fetch loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to add logging to the blocking path, but do not want to swamp the log with a new message every second.
I think it is good to log this as TRACE only. we really need an INFO for the first instance, because we already have an INFO log in pauseOrResumeSplits
method.
it seems that we might just need this while loop
while (pausedSplits.contains(currentSplitId)) {
LOG.trace("Reader {} is still paused reading split {}", indexOfSubtask, currentSplitId);
synchronized (wakeUp) {
if (wakeUp.get()) {
wakeUp.set(false);
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
}
try {
wakeUp.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while paused due to watermark alignment.", e);
}
...k/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java
Outdated
Show resolved
Hide resolved
...flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
if (pausedSplits.contains(currentSplitId)) { | ||
// Wait until the reader is unblocked. Wake every second to catch any missed signal. | ||
// Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed. | ||
boolean first = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need the while
loop, purely because there could be spurious wakeups and we may have to retry if wakeUp==false
. I think the code snippet above you posted @pvary is a good compromise if you replace the if
with a while
.
wakeUp.wait(1000); | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
throw new RuntimeException("Interrupted while paused due to watermark alignment.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary sorry, I misspoken. I meant IOException
instead of InterruptedException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understable. The fetch
javadoc says: or it can just throw an interrupted exception
.
Set the type of the thrown exception to IOException
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
....18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
...flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
@Override | ||
public void wakeUp() {} | ||
public void wakeUp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wakeup if there are no pending splits? I'm trying to reason the previous behavior with wakeup being noop. I suppose wakeup was noop because the source only handles one split, so fetch and handleSplitChanges never occur together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is okay, since the spurious wakeup condition is caught in the fetch loop. And this way you can avoid synchronization on pausedSplits
...flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java
Outdated
Show resolved
Hide resolved
…ortedOperationException
Thanks @stevenzwu, @mas-chen, @nastra, @mxm and @rodmeneses for all the patience and the useful comments! |
…ortedOperationException (apache#9308)
…ortedOperationException (apache#9308)
…ortedOperationException (apache#9308)
Based on the discussion with @mas-chen and @stevenzwu, there is no need to implement split level watermark alignment for Iceberg source, because the IcebergSourceSplitReader only reads splits sequentially. When waiting for watermark alignment the SourceOperator will stop processing and recycling the fetched batches. This exhausts the {@link ArrayPoolDataIteratorBatcher#pool} and the
currentReader.next()
call will be blocked even without split-level watermark alignment. Based on this thepauseOrResumeSplits
and thewakeUp
are left empty.