diff --git a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java index 4517965e09a1..de2f8b411f4e 100644 --- a/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java +++ b/core/trino-main/src/test/java/io/trino/spiller/TestFileSingleStreamSpiller.java @@ -27,13 +27,11 @@ import io.trino.spi.block.TestingBlockEncodingSerde; import io.trino.spi.type.Type; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.parallel.Execution; import java.io.File; -import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.util.Iterator; @@ -65,21 +63,12 @@ public class TestFileSingleStreamSpiller private static final List TYPES = ImmutableList.of(BIGINT, DOUBLE, VARBINARY); private final ListeningExecutorService executor = listeningDecorator(newCachedThreadPool()); - private File spillPath; - - @BeforeAll - public void setUp() - throws IOException - { - spillPath = Files.createTempDirectory("tmp").toFile(); - } @AfterAll public void tearDown() throws Exception { executor.shutdown(); - deleteRecursively(spillPath.toPath(), ALLOW_INSECURE); } @Test @@ -113,62 +102,68 @@ public void testSpillEncryptionWithCompression() private void assertSpill(CompressionCodec compressionCodec, boolean encryption) throws Exception { - FileSingleStreamSpillerFactory spillerFactory = new FileSingleStreamSpillerFactory( - executor, // executor won't be closed, because we don't call destroy() on the spiller factory - new TestingBlockEncodingSerde(), - new SpillerStats(), - ImmutableList.of(spillPath.toPath()), - 1.0, - compressionCodec, - encryption); - LocalMemoryContext memoryContext = newSimpleAggregatedMemoryContext().newLocalMemoryContext("test"); - SingleStreamSpiller singleStreamSpiller = spillerFactory.create(TYPES, bytes -> {}, memoryContext); - assertThat(singleStreamSpiller instanceof FileSingleStreamSpiller).isTrue(); - FileSingleStreamSpiller spiller = (FileSingleStreamSpiller) singleStreamSpiller; - - Page page = buildPage(); - - // The spillers will reserve memory in their constructors - assertThat(memoryContext.getBytes()).isEqualTo(4096); - spiller.spill(page).get(); - spiller.spill(Iterators.forArray(page, page, page)).get(); - assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(1); - - // Assert the spill codec flags match the expected configuration - try (InputStream is = newInputStream(listFiles(spillPath.toPath()).get(0))) { - Iterator serializedPages = PagesSerdeUtil.readSerializedPages(is); - assertThat(serializedPages.hasNext()) - .describedAs("at least one page should be successfully read back") - .isTrue(); - Slice serializedPage = serializedPages.next(); - assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compressionCodec == LZ4); - assertThat(isSerializedPageEncrypted(serializedPage)).isEqualTo(encryption); + File spillPath = Files.createTempDirectory("tmp").toFile(); + try { + FileSingleStreamSpillerFactory spillerFactory = new FileSingleStreamSpillerFactory( + executor, // executor won't be closed, because we don't call destroy() on the spiller factory + new TestingBlockEncodingSerde(), + new SpillerStats(), + ImmutableList.of(spillPath.toPath()), + 1.0, + compressionCodec, + encryption); + LocalMemoryContext memoryContext = newSimpleAggregatedMemoryContext().newLocalMemoryContext("test"); + SingleStreamSpiller singleStreamSpiller = spillerFactory.create(TYPES, bytes -> {}, memoryContext); + assertThat(singleStreamSpiller instanceof FileSingleStreamSpiller).isTrue(); + FileSingleStreamSpiller spiller = (FileSingleStreamSpiller) singleStreamSpiller; + + Page page = buildPage(); + + // The spillers will reserve memory in their constructors + assertThat(memoryContext.getBytes()).isEqualTo(4096); + spiller.spill(page).get(); + spiller.spill(Iterators.forArray(page, page, page)).get(); + assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(1); + + // Assert the spill codec flags match the expected configuration + try (InputStream is = newInputStream(listFiles(spillPath.toPath()).get(0))) { + Iterator serializedPages = PagesSerdeUtil.readSerializedPages(is); + assertThat(serializedPages.hasNext()) + .describedAs("at least one page should be successfully read back") + .isTrue(); + Slice serializedPage = serializedPages.next(); + assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compressionCodec == LZ4); + assertThat(isSerializedPageEncrypted(serializedPage)).isEqualTo(encryption); + } + + // The spillers release their memory reservations when they are closed, therefore at this point + // they will have non-zero memory reservation. + // assertEquals(memoryContext.getBytes(), 0); + + Iterator spilledPagesIterator = spiller.getSpilledPages(); + assertThat(memoryContext.getBytes()).isEqualTo(FileSingleStreamSpiller.BUFFER_SIZE); + ImmutableList spilledPages = ImmutableList.copyOf(spilledPagesIterator); + // The spillers release their memory reservations when they are closed, therefore at this point + // they will have non-zero memory reservation. + // assertEquals(memoryContext.getBytes(), 0); + + assertThat(4).isEqualTo(spilledPages.size()); + for (int i = 0; i < 4; ++i) { + PageAssertions.assertPageEquals(TYPES, page, spilledPages.get(i)); + } + + // Repeated reads are disallowed + assertThatThrownBy(spiller::getSpilledPages) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Repeated reads are disallowed to prevent potential resource leaks"); + + spiller.close(); + assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(0); + assertThat(memoryContext.getBytes()).isEqualTo(0); } - - // The spillers release their memory reservations when they are closed, therefore at this point - // they will have non-zero memory reservation. - // assertEquals(memoryContext.getBytes(), 0); - - Iterator spilledPagesIterator = spiller.getSpilledPages(); - assertThat(memoryContext.getBytes()).isEqualTo(FileSingleStreamSpiller.BUFFER_SIZE); - ImmutableList spilledPages = ImmutableList.copyOf(spilledPagesIterator); - // The spillers release their memory reservations when they are closed, therefore at this point - // they will have non-zero memory reservation. - // assertEquals(memoryContext.getBytes(), 0); - - assertThat(4).isEqualTo(spilledPages.size()); - for (int i = 0; i < 4; ++i) { - PageAssertions.assertPageEquals(TYPES, page, spilledPages.get(i)); + finally { + deleteRecursively(spillPath.toPath(), ALLOW_INSECURE); } - - // Repeated reads are disallowed - assertThatThrownBy(spiller::getSpilledPages) - .isInstanceOf(IllegalStateException.class) - .hasMessage("Repeated reads are disallowed to prevent potential resource leaks"); - - spiller.close(); - assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(0); - assertThat(memoryContext.getBytes()).isEqualTo(0); } private Page buildPage()