diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 108757b4155c..49242971b6ac 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -67,6 +67,11 @@ private ParallelIterator( try (Closeable ignored = (iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { for (T item : iterable) { + // we must exit manually because `ConcurrentLinkedQueue` can't be + // interrupted + if (closed) { + return; + } queue.add(item); } } catch (IOException e) { diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index 68685614d3da..421215fdfc81 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.lang.reflect.Field; -import java.util.Collections; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -40,10 +40,33 @@ public class TestParallelIterable { public void closeParallelIteratorWithoutCompleteIteration() throws IOException, IllegalAccessException, NoSuchFieldException { ExecutorService executor = Executors.newFixedThreadPool(1); + Iterator integerIterator = + new Iterator() { + private int number = 1; + @Override + public boolean hasNext() { + if (number > 1000) { + return false; + } + number++; + return true; + } + + @Override + public Integer next() { + try { + // sleep to control number generate rate + Thread.sleep(10); + } catch (InterruptedException e) { + System.out.println("Sleep interrupted, we ignore it!"); + } + return number; + } + }; Iterable> transform = Iterables.transform( - Lists.newArrayList(1, 2, 3, 4, 5), + Lists.newArrayList(1), item -> new CloseableIterable() { @Override @@ -51,7 +74,7 @@ public void close() {} @Override public CloseableIterator iterator() { - return CloseableIterator.withClose(Collections.singletonList(item).iterator()); + return CloseableIterator.withClose(integerIterator); } }); @@ -69,7 +92,7 @@ public CloseableIterator iterator() { iterator.close(); Awaitility.await("Queue is cleared") .atMost(5, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(queue).isEmpty()); + .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty()); } private void queueHasElements(CloseableIterator iterator, Queue queue) {