From f0fdef23587fe42918f389056e0b3c22e2b7c7c4 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Thu, 5 Dec 2024 15:25:25 -0800 Subject: [PATCH] Adds more unit tests --- tests/io/test_split_scan_tasks.py | 83 +++++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 16 deletions(-) diff --git a/tests/io/test_split_scan_tasks.py b/tests/io/test_split_scan_tasks.py index ba3a1432f4..891b6a16a0 100644 --- a/tests/io/test_split_scan_tasks.py +++ b/tests/io/test_split_scan_tasks.py @@ -17,17 +17,6 @@ def parquet_files(tmpdir): return tmpdir -@pytest.fixture(scope="function") -def many_parquet_files(tmpdir): - """Writes 20 Parquet file with 10 rowgroups, each of 100 bytes in size""" - for i in range(20): - tbl = pa.table({"data": ["aaa"] * 100}) - path = tmpdir / f"file.{i}.pq" - papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False) - - return tmpdir - - def test_split_parquet_read(parquet_files): with daft.execution_config_ctx( scan_tasks_min_size_bytes=1, @@ -38,11 +27,73 @@ def test_split_parquet_read(parquet_files): assert df.to_pydict() == {"data": ["aaa"] * 100} -def test_split_parquet_read_many_files(many_parquet_files): +def test_split_parquet_read_many_files(tmpdir): + # Write 20 files into tmpdir + for i in range(20): + tbl = pa.table({"data": [str(i) for i in range(100)]}) + path = tmpdir / f"file.{i}.pq" + papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False) + with daft.execution_config_ctx( - scan_tasks_min_size_bytes=1, - scan_tasks_max_size_bytes=10, + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, ): - df = daft.read_parquet(str(many_parquet_files)) + df = daft.read_parquet(str(tmpdir)) assert df.num_partitions() == 200, "Should have 200 partitions since we will split all files" - assert df.to_pydict() == {"data": ["aaa"] * 2000} + assert df.to_pydict() == {"data": [str(i) for i in range(100)] * 20} + + +def test_split_parquet_read_some_splits(tmpdir): + # Write a mix of 20 large and 20 small files + # Small ones should not be split, large ones should be split into 10 rowgroups each + # This gives us a total of 200 + 20 scantasks + + # Write 20 large files into tmpdir + large_file_paths = [] + for i in range(20): + tbl = pa.table({"data": [str(f"large{i}") for i in range(100)]}) + path = tmpdir / f"file.{i}.large.pq" + papq.write_table(tbl, str(path), row_group_size=10, use_dictionary=False) + large_file_paths.append(str(path)) + + # Write 20 small files into tmpdir + small_file_paths = [] + for i in range(20): + tbl = pa.table({"data": ["small"]}) + path = tmpdir / f"file.{i}.small.pq" + papq.write_table(tbl, str(path), row_group_size=1, use_dictionary=False) + small_file_paths.append(str(path)) + + # Test [large_paths, ..., small_paths, ...] + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, + ): + df = daft.read_parquet(large_file_paths + small_file_paths) + assert ( + df.num_partitions() == 220 + ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" + assert df.to_pydict() == {"data": [str(f"large{i}") for i in range(100)] * 20 + ["small"] * 20} + + # Test interleaved [large_path, small_path, large_path, small_path, ...] + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, + ): + interleaved_paths = [path for pair in zip(large_file_paths, small_file_paths) for path in pair] + df = daft.read_parquet(interleaved_paths) + assert ( + df.num_partitions() == 220 + ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" + assert df.to_pydict() == {"data": ([str(f"large{i}") for i in range(100)] + ["small"]) * 20} + + # Test [small_paths, ..., large_paths] + with daft.execution_config_ctx( + scan_tasks_min_size_bytes=20, + scan_tasks_max_size_bytes=100, + ): + df = daft.read_parquet(small_file_paths + large_file_paths) + assert ( + df.num_partitions() == 220 + ), "Should have 220 partitions since we will split all large files (20 * 10 rowgroups) but keep small files unsplit" + assert df.to_pydict() == {"data": ["small"] * 20 + [str(f"large{i}") for i in range(100)] * 20}