diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 108757b4155c..d7221e7d4545 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -67,6 +67,12 @@ private ParallelIterator( try (Closeable ignored = (iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { for (T item : iterable) { + // exit manually because `ConcurrentLinkedQueue` can't be + // interrupted + if (closed) { + return; + } + queue.add(item); } } catch (IOException e) { diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index 68685614d3da..3f121636c5b5 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.Collections; +import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; @@ -72,6 +73,65 @@ public CloseableIterator iterator() { .untilAsserted(() -> assertThat(queue).isEmpty()); } + @Test + public void closeMoreDataParallelIteratorWithoutCompleteIteration() + throws IOException, IllegalAccessException, NoSuchFieldException { + ExecutorService executor = Executors.newFixedThreadPool(1); + Iterator integerIterator = + new Iterator() { + private int number = 1; + + @Override + public boolean hasNext() { + if (number > 1000) { + return false; + } + number++; + return true; + } + + @Override + public Integer next() { + try { + // sleep to control number generate rate + Thread.sleep(10); + } catch (InterruptedException e) { + // Sleep interrupted, we ignore it! + } + return number; + } + }; + Iterable> transform = + Iterables.transform( + Lists.newArrayList(1), + item -> + new CloseableIterable() { + @Override + public void close() {} + + @Override + public CloseableIterator iterator() { + return CloseableIterator.withClose(integerIterator); + } + }); + + ParallelIterable parallelIterable = new ParallelIterable<>(transform, executor); + CloseableIterator iterator = parallelIterable.iterator(); + Field queueField = iterator.getClass().getDeclaredField("queue"); + queueField.setAccessible(true); + ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue) queueField.get(iterator); + + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.next()).isNotNull(); + Awaitility.await("Queue is populated") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> queueHasElements(iterator, queue)); + iterator.close(); + Awaitility.await("Queue is cleared") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty()); + } + private void queueHasElements(CloseableIterator iterator, Queue queue) { assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull();