Skip to content

Commit

Permalink
Adds more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 6, 2024
1 parent 50c6605 commit f0fdef2
Showing 1 changed file with 67 additions and 16 deletions.
83 changes: 67 additions & 16 deletions tests/io/test_split_scan_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,6 @@ def parquet_files(tmpdir):
return tmpdir


@pytest.fixture(scope="function")
def many_parquet_files(tmpdir):
"""Writes 20 Parquet file with 10 rowgroups, each of 100 bytes in size"""
for i in range(20):
tbl = pa.table({"data": ["aaa"] * 100})
path = tmpdir / f"file.{i}.pq"
papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False)

return tmpdir


def test_split_parquet_read(parquet_files):
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=1,
Expand All @@ -38,11 +27,73 @@ def test_split_parquet_read(parquet_files):
assert df.to_pydict() == {"data": ["aaa"] * 100}


def test_split_parquet_read_many_files(many_parquet_files):
def test_split_parquet_read_many_files(tmpdir):
# Write 20 files into tmpdir
for i in range(20):
tbl = pa.table({"data": [str(i) for i in range(100)]})
path = tmpdir / f"file.{i}.pq"
papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False)

with daft.execution_config_ctx(
scan_tasks_min_size_bytes=1,
scan_tasks_max_size_bytes=10,
scan_tasks_min_size_bytes=20,
scan_tasks_max_size_bytes=100,
):
df = daft.read_parquet(str(many_parquet_files))
df = daft.read_parquet(str(tmpdir))
assert df.num_partitions() == 200, "Should have 200 partitions since we will split all files"
assert df.to_pydict() == {"data": ["aaa"] * 2000}
assert df.to_pydict() == {"data": [str(i) for i in range(100)] * 20}


def test_split_parquet_read_some_splits(tmpdir):
# Write a mix of 20 large and 20 small files
# Small ones should not be split, large ones should be split into 10 rowgroups each
# This gives us a total of 200 + 20 scantasks

# Write 20 large files into tmpdir
large_file_paths = []
for i in range(20):
tbl = pa.table({"data": [str(f"large{i}") for i in range(100)]})
path = tmpdir / f"file.{i}.large.pq"
papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False)
large_file_paths.append(str(path))

# Write 20 small files into tmpdir
small_file_paths = []
for i in range(20):
tbl = pa.table({"data": ["small"]})
path = tmpdir / f"file.{i}.small.pq"
papq.write_table(tbl, str(path), row_group_size=1, use_dictionary=False)
small_file_paths.append(str(path))

# Test [large_paths, ..., small_paths, ...]
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=20,
scan_tasks_max_size_bytes=100,
):
df = daft.read_parquet(large_file_paths + small_file_paths)
assert (
df.num_partitions() == 220
), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit"
assert df.to_pydict() == {"data": [str(f"large{i}") for i in range(100)] * 20 + ["small"] * 20}

# Test interleaved [large_path, small_path, large_path, small_path, ...]
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=20,
scan_tasks_max_size_bytes=100,
):
interleaved_paths = [path for pair in zip(large_file_paths, small_file_paths) for path in pair]
df = daft.read_parquet(interleaved_paths)
assert (
df.num_partitions() == 220
), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit"
assert df.to_pydict() == {"data": ([str(f"large{i}") for i in range(100)] + ["small"]) * 20}

# Test [small_paths, ..., large_paths]
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=20,
scan_tasks_max_size_bytes=100,
):
df = daft.read_parquet(small_file_paths + large_file_paths)
assert (
df.num_partitions() == 220
), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit"
assert df.to_pydict() == {"data": ["small"] * 20 + [str(f"large{i}") for i in range(100)] * 20}

0 comments on commit f0fdef2

Please sign in to comment.