Skip to content

Commit

Permalink
Fix flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Apr 5, 2024
1 parent 8fa0990 commit 47112db
Showing 1 changed file with 60 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@
import io.trino.spi.block.TestingBlockEncodingSerde;
import io.trino.spi.type.Type;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Iterator;
Expand Down Expand Up @@ -65,21 +63,12 @@ public class TestFileSingleStreamSpiller
private static final List<Type> TYPES = ImmutableList.of(BIGINT, DOUBLE, VARBINARY);

private final ListeningExecutorService executor = listeningDecorator(newCachedThreadPool());
private File spillPath;

@BeforeAll
public void setUp()
throws IOException
{
spillPath = Files.createTempDirectory("tmp").toFile();
}

@AfterAll
public void tearDown()
throws Exception
{
executor.shutdown();
deleteRecursively(spillPath.toPath(), ALLOW_INSECURE);
}

@Test
Expand Down Expand Up @@ -113,62 +102,68 @@ public void testSpillEncryptionWithCompression()
private void assertSpill(CompressionCodec compressionCodec, boolean encryption)
throws Exception
{
FileSingleStreamSpillerFactory spillerFactory = new FileSingleStreamSpillerFactory(
executor, // executor won't be closed, because we don't call destroy() on the spiller factory
new TestingBlockEncodingSerde(),
new SpillerStats(),
ImmutableList.of(spillPath.toPath()),
1.0,
compressionCodec,
encryption);
LocalMemoryContext memoryContext = newSimpleAggregatedMemoryContext().newLocalMemoryContext("test");
SingleStreamSpiller singleStreamSpiller = spillerFactory.create(TYPES, bytes -> {}, memoryContext);
assertThat(singleStreamSpiller instanceof FileSingleStreamSpiller).isTrue();
FileSingleStreamSpiller spiller = (FileSingleStreamSpiller) singleStreamSpiller;

Page page = buildPage();

// The spillers will reserve memory in their constructors
assertThat(memoryContext.getBytes()).isEqualTo(4096);
spiller.spill(page).get();
spiller.spill(Iterators.forArray(page, page, page)).get();
assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(1);

// Assert the spill codec flags match the expected configuration
try (InputStream is = newInputStream(listFiles(spillPath.toPath()).get(0))) {
Iterator<Slice> serializedPages = PagesSerdeUtil.readSerializedPages(is);
assertThat(serializedPages.hasNext())
.describedAs("at least one page should be successfully read back")
.isTrue();
Slice serializedPage = serializedPages.next();
assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compressionCodec == LZ4);
assertThat(isSerializedPageEncrypted(serializedPage)).isEqualTo(encryption);
File spillPath = Files.createTempDirectory("tmp").toFile();
try {
FileSingleStreamSpillerFactory spillerFactory = new FileSingleStreamSpillerFactory(
executor, // executor won't be closed, because we don't call destroy() on the spiller factory
new TestingBlockEncodingSerde(),
new SpillerStats(),
ImmutableList.of(spillPath.toPath()),
1.0,
compressionCodec,
encryption);
LocalMemoryContext memoryContext = newSimpleAggregatedMemoryContext().newLocalMemoryContext("test");
SingleStreamSpiller singleStreamSpiller = spillerFactory.create(TYPES, bytes -> {}, memoryContext);
assertThat(singleStreamSpiller instanceof FileSingleStreamSpiller).isTrue();
FileSingleStreamSpiller spiller = (FileSingleStreamSpiller) singleStreamSpiller;

Page page = buildPage();

// The spillers will reserve memory in their constructors
assertThat(memoryContext.getBytes()).isEqualTo(4096);
spiller.spill(page).get();
spiller.spill(Iterators.forArray(page, page, page)).get();
assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(1);

// Assert the spill codec flags match the expected configuration
try (InputStream is = newInputStream(listFiles(spillPath.toPath()).get(0))) {
Iterator<Slice> serializedPages = PagesSerdeUtil.readSerializedPages(is);
assertThat(serializedPages.hasNext())
.describedAs("at least one page should be successfully read back")
.isTrue();
Slice serializedPage = serializedPages.next();
assertThat(isSerializedPageCompressed(serializedPage)).isEqualTo(compressionCodec == LZ4);
assertThat(isSerializedPageEncrypted(serializedPage)).isEqualTo(encryption);
}

// The spillers release their memory reservations when they are closed, therefore at this point
// they will have non-zero memory reservation.
// assertEquals(memoryContext.getBytes(), 0);

Iterator<Page> spilledPagesIterator = spiller.getSpilledPages();
assertThat(memoryContext.getBytes()).isEqualTo(FileSingleStreamSpiller.BUFFER_SIZE);
ImmutableList<Page> spilledPages = ImmutableList.copyOf(spilledPagesIterator);
// The spillers release their memory reservations when they are closed, therefore at this point
// they will have non-zero memory reservation.
// assertEquals(memoryContext.getBytes(), 0);

assertThat(4).isEqualTo(spilledPages.size());
for (int i = 0; i < 4; ++i) {
PageAssertions.assertPageEquals(TYPES, page, spilledPages.get(i));
}

// Repeated reads are disallowed
assertThatThrownBy(spiller::getSpilledPages)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Repeated reads are disallowed to prevent potential resource leaks");

spiller.close();
assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(0);
assertThat(memoryContext.getBytes()).isEqualTo(0);
}

// The spillers release their memory reservations when they are closed, therefore at this point
// they will have non-zero memory reservation.
// assertEquals(memoryContext.getBytes(), 0);

Iterator<Page> spilledPagesIterator = spiller.getSpilledPages();
assertThat(memoryContext.getBytes()).isEqualTo(FileSingleStreamSpiller.BUFFER_SIZE);
ImmutableList<Page> spilledPages = ImmutableList.copyOf(spilledPagesIterator);
// The spillers release their memory reservations when they are closed, therefore at this point
// they will have non-zero memory reservation.
// assertEquals(memoryContext.getBytes(), 0);

assertThat(4).isEqualTo(spilledPages.size());
for (int i = 0; i < 4; ++i) {
PageAssertions.assertPageEquals(TYPES, page, spilledPages.get(i));
finally {
deleteRecursively(spillPath.toPath(), ALLOW_INSECURE);
}

// Repeated reads are disallowed
assertThatThrownBy(spiller::getSpilledPages)
.isInstanceOf(IllegalStateException.class)
.hasMessage("Repeated reads are disallowed to prevent potential resource leaks");

spiller.close();
assertThat(listFiles(spillPath.toPath()).size()).isEqualTo(0);
assertThat(memoryContext.getBytes()).isEqualTo(0);
}

private Page buildPage()
Expand Down

0 comments on commit 47112db

Please sign in to comment.