diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index fbc6f19fd7..f9df8d9d35 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -364,7 +364,7 @@ pub(crate) fn local_parquet_read_into_arrow( rg, schema.fields.clone(), Some(chunk_size), - num_rows, + Some(rg_range.num_rows), None, ); let single_rg_column_iter = single_rg_column_iter?; diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py index 7deb83c46f..be2be34fca 100644 --- a/tests/io/test_parquet.py +++ b/tests/io/test_parquet.py @@ -6,6 +6,7 @@ import tempfile import uuid +import numpy as np import pyarrow as pa import pyarrow.parquet as papq import pytest @@ -159,8 +160,6 @@ def test_row_groups(): @pytest.mark.integration() @pytest.mark.parametrize("chunk_size", [5, 1024, 2048, 4096]) def test_parquet_rows_cross_page_boundaries(tmpdir, minio_io_config, chunk_size): - import numpy as np - int64_min = -(2**63) int64_max = 2**63 - 1 @@ -346,3 +345,39 @@ def test_parquet_helper(data_and_type, use_daft_writer): # One column uses a single dictionary-encoded data page, and the other contains data pages with # 4096 values each. test_parquet_helper(get_string_data_and_type(8192, 300, 1), True) + + +@pytest.mark.integration() +def test_parquet_limits_across_row_groups(tmpdir, minio_io_config): + test_row_group_size = 1024 + daft_execution_config = daft.context.get_context().daft_execution_config + default_row_group_size = daft_execution_config.parquet_target_row_group_size + int_array = np.full(shape=4096, fill_value=3, dtype=np.int32) + before = daft.from_pydict({"col": pa.array(int_array, type=pa.int32())}) + file_path = f"{tmpdir}/{str(uuid.uuid4())}.parquet" + # Decrease the target row group size before writing the parquet file. + daft.set_execution_config(parquet_target_row_group_size=test_row_group_size) + before.write_parquet(file_path) + assert ( + before.limit(test_row_group_size + 10).to_arrow() + == daft.read_parquet(file_path).limit(test_row_group_size + 10).to_arrow() + ) + assert ( + before.limit(test_row_group_size * 2).to_arrow() + == daft.read_parquet(file_path).limit(test_row_group_size * 2).to_arrow() + ) + + bucket_name = "my-bucket" + s3_path = f"s3://{bucket_name}/my-folder" + with minio_create_bucket(minio_io_config=minio_io_config, bucket_name=bucket_name): + before.write_parquet(s3_path, io_config=minio_io_config) + assert ( + before.limit(test_row_group_size + 10).to_arrow() + == daft.read_parquet(s3_path, io_config=minio_io_config).limit(test_row_group_size + 10).to_arrow() + ) + assert ( + before.limit(test_row_group_size * 2).to_arrow() + == daft.read_parquet(s3_path, io_config=minio_io_config).limit(test_row_group_size * 2).to_arrow() + ) + # Reset the target row group size. + daft.set_execution_config(parquet_target_row_group_size=default_row_group_size)