Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Empty implementation for pauseOrResumeSplits to prevent UnsupportedOperationException #9308

Merged
merged 1 commit into from
Dec 21, 2023

Conversation

pvary
Copy link
Contributor

@pvary pvary commented Dec 15, 2023

Based on the discussion with @mas-chen and @stevenzwu, there is no need to implement split level watermark alignment for Iceberg source, because the IcebergSourceSplitReader only reads splits sequentially. When waiting for watermark alignment the SourceOperator will stop processing and recycling the fetched batches. This exhausts the {@link ArrayPoolDataIteratorBatcher#pool} and the currentReader.next() call will be blocked even without split-level watermark alignment. Based on this the pauseOrResumeSplits and the wakeUp are left empty.

@pvary
Copy link
Contributor Author

pvary commented Dec 15, 2023

CC: @ajantha-bhat - further fixes on the test

@@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: why was this needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We specify 2.x slf4j as an Iceberg dependency. But these dependencies carry a 1.x slf4j transitive dependency. When both 1.x and 2.x slf4j is on classpath the the loggers are not working, so it is very hard to follow what happens during the tests

Copy link
Contributor

@rodmeneses rodmeneses left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor comments, otherwise LGTM

@stevenzwu
Copy link
Contributor

stevenzwu commented Dec 15, 2023

@pvary is the whole PR addressing the flaky test? if not, please separate out the flaky test part (hopefully small) into a separate PR. I would like to merge the flaky test fix ASAP. And you can include all 3 versions (1.16, 1.17, 1.18) in one PR.

@pvary
Copy link
Contributor Author

pvary commented Dec 15, 2023

@pvary is the whole PR addressing the flaky test? if not, please separate out the flaky test part (hopefully small) into a separate PR. I would like to merge the flaky test fix ASAP. And you can include all 3 versions (1.16, 1.17, 1.18) in one PR.

As mentioned in the description, the missing pauseOrResumeSplits method causes flakiness. I think it is better to fix it in one PR, than having a PR disabling the test, and another PR adding the test back. WDYT?

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary we are changing some critical code here. I would prefer we take a little more time here and get more people to review.

Hence I would prefer if we disable/ignore the test first so that we make other PR/CI builds stable ASAP.

@pvary
Copy link
Contributor Author

pvary commented Dec 17, 2023

@pvary we are changing some critical code here. I would prefer we take a little more time here and get more people to review.

Hence I would prefer if we disable/ignore the test first so that we make other PR/CI builds stable ASAP.

That is why I created #9309. That contains the most important other fixes.

@pvary
Copy link
Contributor Author

pvary commented Dec 17, 2023

@stevenzwu: Here is what I have found:
wakeUp is called whenever there is a new message concerning the SplitReader:

  • new split added/removed
  • split paused/resumed

These messages are executed in the same thread as the fetch method, so in this case we have to return from the fetch loop, even with empty results. This is somewhat concerning to me considering the comment here:

// return an empty result, which will lead to split fetch to be idle.
// SplitFetcherManager will then close idle fetcher.
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());

I do not see that it is closed, but probably you know more...

@pvary
Copy link
Contributor Author

pvary commented Dec 18, 2023

These messages are executed in the same thread as the fetch method, so in this case we have to return from the fetch loop, even with empty results. This is somewhat concerning to me considering the comment here:

I have checked, and the idleness is defined like this:
https://github.com/apache/flink/blob/1d33773e6b7f9f76f03ff8ffd73171b95fa24ccb/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java#L338-L345

    boolean isIdle() {
        lock.lock();
        try {
            return assignedSplits.isEmpty() && taskQueue.isEmpty() && runningTask == null;
        } finally {
            lock.unlock();
        }
    }

So if we have tasks (split manipulation/pause), then we are not idle, so the Fetcher is not removed.

Also the Fetcher also handles the wakeUp flag, so we are golden:
https://github.com/apache/flink/blob/c28dd942e3dded4cab7a33037077a1d3bfa0929e/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java#L61-L71

            if (!isWakenUp()) {
                // The order matters here. We must first put the last records into the queue.
                // This ensures the handling of the fetched records is atomic to wakeup.
                if (elementsQueue.put(fetcherIndex, lastRecords)) {
                    if (!lastRecords.finishedSplits().isEmpty()) {
                        // The callback does not throw InterruptedException.
                        splitFinishedCallback.accept(lastRecords.finishedSplits());
                    }
                    lastRecords = null;
                }
            }

// Unpause the reading of the split, and check the result size
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split));
result = (ArrayBatchRecords) reader.fetch();
assertThat(result.numberOfRecords()).isEqualTo(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertThat(result.numberOfRecords()).isEqualTo(3);
assertThat(result.numberOfRecords()).isGreaterThan(2);

For test stability, I think fetch may not be guaranteed to return 1 record after pausing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reader reads the records in batches. The size of the batches are governed by:

  public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_RECORD_COUNT =
      ConfigOptions.key("table.exec.iceberg.fetch-batch-record-count")
          .intType()
          .defaultValue(2048)
          .withDescription("The target number of records for Iceberg reader fetch batch.");

The default is 2048, so all of the records are read here

// Unpause and read the data in th next fetch
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split));
result = (ArrayBatchRecords) reader.fetch();
assertThat(result.numberOfRecords()).isEqualTo(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto same comment here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will get all 3 records

@mas-chen
Copy link
Contributor

BTW, should we add an integration test?

@pvary
Copy link
Contributor Author

pvary commented Dec 18, 2023

BTW, should we add an integration test?

That reminds me, to remove the set(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS, true) from the TestIcebergSourceWithWatermarkExtractor test. That is how I have found this issue.

The TestIcebergSourceWithWatermarkExtractor.testThrottling was slightly flaky because of this change (1 out of 1000), but I do not see any other good way to integration test it.

@@ -383,8 +381,10 @@ protected IcebergSource<RowData> source() {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
.monitorInterval(Duration.ofMillis(2))
.monitorInterval(Duration.ofMillis(100))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this, because the 2ms will causing too much load and ended up causing flakiness:

[collect-sink-operator-coordinator-executor-thread-pool-thread-1] WARN org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator - Collect sink coordinator encounters a CompletionException: java.net.SocketTimeoutException: Read timed out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2ms may cause high cpu usage. but why would it cause flakiness?

let's change it to 10 ms then. smaller value can discover new files sooner and make the test run faster. we already have a lot of integration tests using 10 ms and they are running fine.

Copy link
Contributor Author

@pvary pvary Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run the tests during the whole day yesterday, and found that the previous fix removed most of the flakiness, but we still have some issues which fails around 1000-2000 runs.

In failure cases the records are emitted by the emitter, but none of the are arriving to the Sink. I see the above error in the logs in this case: SocketTimeout on the sink coordinator, and retries - this appears multiple times before the timeout. Also the log is swamped with the planner logs.

With increasing the monitoring interval, I had 3500 successful runs in a row, when I decided that I need to spend my time elsewhere. As you can imagine this is nothing conclusive, and given the failure rate, nothing that could be tracked easily, but seems like a good improvement.

wakeUp.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while blocked on reading.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct to throw RuntimeException? why don't we just propagate the InterruptedException per the contract?

     * @throws IOException when encountered IO errors, such as deserialization failures.
     */
    RecordsWithSplitIds<E> fetch() throws IOException;

also the error msg is inaccurate. it is not blocked on reading. it is paused due to watermark alignment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the InterruptedException is not declared as throwable, so we can not throw it...

@@ -383,8 +381,10 @@ protected IcebergSource<RowData> source() {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
.monitorInterval(Duration.ofMillis(2))
.monitorInterval(Duration.ofMillis(100))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2ms may cause high cpu usage. but why would it cause flakiness?

let's change it to 10 ms then. smaller value can discover new files sooner and make the test run faster. we already have a lot of integration tests using 10 ms and they are running fine.

if (pausedSplits.contains(currentSplitId)) {
// Wait until the reader is unblocked. Wake every second to catch any missed signal.
// Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed.
boolean first = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that the logic can be significantly simpler if we use unlimited wait(). simply return an empty result upon wakeUp.wait()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am generally very-very cautious about using loop-less blocking calls.
Here is an example:

  • WT (Watermark Thread) - Watermark arrives, and the decision is to block this reader
  • WT - wakeup is called
  • ST (SplitReader Thread) - Current fetch finished
  • ST - pauseOrResumeSplits is called
  • ST - New fetch called
  • ST - New fetch is checking if the split is paused, and find that we need to block, but do not start to wait on the lock yet.
  • WT - New watermark arrives, and the decision is to unblock the reader
  • WT - wakeup is called
  • ST - If the fetch arrives to the locking code now, then it will not receive the wakeup call

The main issue is that the wakeUp and the pauseOrResumeSplits is not atomic, we need another flag to store if the given wakeUp is handled, or not.

So i think we minimally need to have something like this:

      if (pausedSplits.contains(currentSplitId)) {
          LOG.info("Reader {} paused reading split {}", indexOfSubtask, currentSplitId);
          synchronized (wakeUp) {
            if (wakeUp.get()) {
              wakeUp.set(false);
              return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
            }

            try {
              wakeUp.wait();
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
              throw new RuntimeException("Interrupted while blocked on reading.", e);
            }
          }
        }
      }

I think compared to this the added security of the while loop is a good compromise.
WDYT?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the while loop, purely because there could be spurious wakeups and we may have to retry if wakeUp==false. I think the code snippet above you posted @pvary is a good compromise if you replace the if with a while.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm According to the API contract, it is ok to return empty result in case of spurious wakeups. that is what the current PR does already.

            if (wakeUp.get()) {
              wakeUp.set(false);
              return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
            }

Copy link
Contributor

@stevenzwu stevenzwu Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary I am fine with while-loop and using wait timeout. I understand it is safer for avoiding potential deadlock.

The main issue is that the wakeUp and the pauseOrResumeSplits is not atomic, we need another flag to store if the given wakeUp is handled, or not.

I see the problem now from the timeline example you said earlier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code snippet above, and the one proposed by the PR is only different because I wanted to add logging to the blocking path, but do not want to swamp the log with a new message every second. If you think that the logging is not important, the decision is that we accept that we log a line every second on INFO level until the SplitReader is unblocked, then we can simplify the code. Or we accept that every fetch() loop will create and new boolean variable which is not used in 90 percent of the cases.

On the hot path I prefer optimized code to simpler code, but the difference is probably not too noticeable anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the timeline example above, it indicates a condition where the reader thread can be unnecessary blocked for 1s, which is not ideal. trying to think if there is anyway we can avoid this suboptimal bhavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeline is only valid if we do not use an extra flag to store the wakeUp state and rely on a simple wait-notify pattern.

I used this timeline to describe why I introduced the wakeUp flag, and use synchronize around it in the PR.
I make sure that the wakeUp flag is only read/set inside a synchronized block, and we return from the fetch while holding the synchronization lock. So if there is a concurrent wakeUp method call, it will be blocked until we return, and we handle the next wakeup in the next fetch loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to add logging to the blocking path, but do not want to swamp the log with a new message every second.

I think it is good to log this as TRACE only. we really need an INFO for the first instance, because we already have an INFO log in pauseOrResumeSplits method.

it seems that we might just need this while loop

while (pausedSplits.contains(currentSplitId)) {
          LOG.trace("Reader {} is still paused reading split {}", indexOfSubtask, currentSplitId);
          synchronized (wakeUp) {
            if (wakeUp.get()) {
              wakeUp.set(false);
              return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
            }

            try {
              wakeUp.wait(1000);
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
              throw new IOException("Interrupted while paused due to watermark alignment.", e);
            }

Copy link

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

if (pausedSplits.contains(currentSplitId)) {
// Wait until the reader is unblocked. Wake every second to catch any missed signal.
// Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed.
boolean first = true;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the while loop, purely because there could be spurious wakeups and we may have to retry if wakeUp==false. I think the code snippet above you posted @pvary is a good compromise if you replace the if with a while.

wakeUp.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while paused due to watermark alignment.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary sorry, I misspoken. I meant IOException instead of InterruptedException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understable. The fetch javadoc says: or it can just throw an interrupted exception.
Set the type of the thrown exception to IOException

@stevenzwu stevenzwu changed the title Flink: Support watermark alignment of source splits Flink: implement pause and resume in IcebergSourceSplitReader Dec 19, 2023
@Override
public void wakeUp() {}
public void wakeUp() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wakeup if there are no pending splits? I'm trying to reason the previous behavior with wakeup being noop. I suppose wakeup was noop because the source only handles one split, so fetch and handleSplitChanges never occur together

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is okay, since the spurious wakeup condition is caught in the fetch loop. And this way you can avoid synchronization on pausedSplits

@pvary pvary changed the title Flink: implement pause and resume in IcebergSourceSplitReader Flink: Empty implementation for pauseOrResumeSplits to prevent UnsupportedOperationException Dec 21, 2023
@pvary pvary merged commit 2eea697 into apache:main Dec 21, 2023
13 checks passed
@pvary pvary deleted the wm2 branch December 21, 2023 07:32
@pvary
Copy link
Contributor Author

pvary commented Dec 21, 2023

Thanks @stevenzwu, @mas-chen, @nastra, @mxm and @rodmeneses for all the patience and the useful comments!

pvary pushed a commit to pvary/iceberg that referenced this pull request Jan 3, 2024
pvary pushed a commit to pvary/iceberg that referenced this pull request Jan 3, 2024
lisirrx pushed a commit to lisirrx/iceberg that referenced this pull request Jan 4, 2024
lisirrx pushed a commit to lisirrx/iceberg that referenced this pull request Jan 4, 2024
geruh pushed a commit to geruh/iceberg that referenced this pull request Jan 26, 2024
geruh pushed a commit to geruh/iceberg that referenced this pull request Jan 26, 2024
devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants