diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index fbc6f19fd7..f9df8d9d35 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -364,7 +364,7 @@ pub(crate) fn local_parquet_read_into_arrow( rg, schema.fields.clone(), Some(chunk_size), - num_rows, + Some(rg_range.num_rows), None, ); let single_rg_column_iter = single_rg_column_iter?; diff --git a/tests/integration/io/parquet/test_reads_public_data.py b/tests/integration/io/parquet/test_reads_public_data.py index ec9c9a397f..3cdea0ecad 100644 --- a/tests/integration/io/parquet/test_reads_public_data.py +++ b/tests/integration/io/parquet/test_reads_public_data.py @@ -174,6 +174,10 @@ def get_filesystem_from_path(path: str, **kwargs) -> fsspec.AbstractFileSystem: "parquet-benchmarking/s3a-mvp", "s3a://daft-public-data/test_fixtures/parquet-dev/mvp.parquet", ), + ( + "parquet/test_parquet_limits_across_row_groups", + "s3://daft-public-data/test_fixtures/parquet-dev/tpch-issue#2730.parquet", + ), ( "azure/mvp/az", "az://public-anonymous/mvp.parquet", diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py index 7deb83c46f..ad44155657 100644 --- a/tests/io/test_parquet.py +++ b/tests/io/test_parquet.py @@ -6,6 +6,7 @@ import tempfile import uuid +import numpy as np import pyarrow as pa import pyarrow.parquet as papq import pytest @@ -159,8 +160,6 @@ def test_row_groups(): @pytest.mark.integration() @pytest.mark.parametrize("chunk_size", [5, 1024, 2048, 4096]) def test_parquet_rows_cross_page_boundaries(tmpdir, minio_io_config, chunk_size): - import numpy as np - int64_min = -(2**63) int64_max = 2**63 - 1 @@ -346,3 +345,24 @@ def test_parquet_helper(data_and_type, use_daft_writer): # One column uses a single dictionary-encoded data page, and the other contains data pages with # 4096 values each. test_parquet_helper(get_string_data_and_type(8192, 300, 1), True) + + +def test_parquet_limits_across_row_groups(tmpdir): + row_group_size = 1024 + int_array = np.full(shape=4096, fill_value=3, dtype=np.int32) + before = pa.Table.from_arrays( + [ + pa.array(int_array, type=pa.int32()), + ], + names=["col"], + ) + file_path = f"{tmpdir}/{str(uuid.uuid4())}.parquet" + papq.write_table(before, file_path, row_group_size=row_group_size) + assert ( + before.take(list(range(min(before.num_rows, row_group_size + 10)))).to_pydict() + == daft.read_parquet(file_path).limit(row_group_size + 10).to_pydict() + ) + assert ( + before.take(list(range(min(before.num_rows, row_group_size * 2)))).to_pydict() + == daft.read_parquet(file_path).limit(row_group_size * 2).to_pydict() + )