Skip to content

Commit

Permalink
WakeUp with empty result
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 17, 2023
1 parent 7534f5a commit 3d92980
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
Expand All @@ -51,7 +52,7 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
private final int indexOfSubtask;
private final Queue<IcebergSourceSplit> splits;
private final Set<String> pausedSplits;
private final Object signal = new Object();
private final AtomicBoolean wakeUp = new AtomicBoolean(false);

private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> currentReader;
private IcebergSourceSplit currentSplit;
Expand Down Expand Up @@ -87,22 +88,32 @@ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
}

if (currentReader.hasNext()) {
// Wait until the reader is blocked. Wake every second in case this missed a signal
boolean first = true;
while (pausedSplits.contains(currentSplitId)) {
if (first) {
LOG.info("Paused reading {}", currentSplitId);
} else {
LOG.trace("Still paused {}", currentSplitId);
}
if (pausedSplits.contains(currentSplitId)) {
// Wait until the reader is blocked. Wake every second to catch any missed signal.
// Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed.
boolean first = true;

while (pausedSplits.contains(currentSplitId)) {
if (first) {
LOG.info("Reader {} paused reading split {}", indexOfSubtask, currentSplitId);
first = false;
} else {
LOG.trace("Reader {} is Still paused reading split {}", indexOfSubtask, currentSplitId);
}

try {
synchronized (signal) {
signal.wait(1000);
synchronized (wakeUp) {
if (wakeUp.get()) {
wakeUp.set(false);
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
}

try {
wakeUp.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while blocked on reading.", e);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while blocked on reading.", e);
}
}

Expand Down Expand Up @@ -140,10 +151,10 @@ public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {

@Override
public void wakeUp() {
LOG.info("WakeUp called");
pausedSplits.clear();
synchronized (signal) {
signal.notify();
LOG.info("WakeUp reader {}", indexOfSubtask);
synchronized (wakeUp) {
wakeUp.set(true);
wakeUp.notifyAll();
}
}

Expand All @@ -158,14 +169,15 @@ public void close() throws Exception {
@Override
public void pauseOrResumeSplits(
Collection<IcebergSourceSplit> splitsToPause, Collection<IcebergSourceSplit> splitsToResume) {
LOG.info("Pause splits: {} and resume splits: {}", splitsToPause, splitsToResume);
LOG.info(
"For reader {}, pause splits {} and resume splits {}",
indexOfSubtask,
splitsToPause,
splitsToResume);
pausedSplits.addAll(
splitsToPause.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet()));
pausedSplits.removeAll(
splitsToResume.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet()));
synchronized (signal) {
signal.notify();
}
}

private long calculateBytes(IcebergSourceSplit split) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.flink.source;

import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT;
import static org.apache.iceberg.flink.FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.io.Serializable;
Expand Down Expand Up @@ -382,6 +383,8 @@ protected IcebergSource<RowData> source() {
.streaming(true)
.monitorInterval(Duration.ofMillis(2))
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
// Read in single row batches, to have more batches for testing
.flinkConfig(new Configuration().set(SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
Expand All @@ -39,6 +44,7 @@
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
Expand Down Expand Up @@ -106,14 +112,34 @@ public void testWakeup() throws Exception {
// Pause the reading of the split, and check that the read is blocked when trying to fetch new
// records
reader.pauseOrResumeSplits(ImmutableList.of(split), ImmutableList.of());

assertThatThrownBy(
() ->
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> reader.fetch() != null))
.isInstanceOf(ConditionTimeoutException.class)
.hasMessageContaining("was not fulfilled within");

// Run the new fetch and wakeUp concurrently. Expect empty result from the blocked fetch.
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1));

AtomicBoolean emptyResult = new AtomicBoolean(false);
executorService.execute(
() -> {
try {
emptyResult.set(reader.fetch().nextSplit() == null);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// Wakeup the reading of the split, and check the result size
reader.wakeUp();
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> emptyResult.get());

// Unpause and read the data in th next fetch
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split));
result = (ArrayBatchRecords) reader.fetch();
assertThat(result.numberOfRecords()).isEqualTo(3);
}
Expand Down

0 comments on commit 3d92980

Please sign in to comment.