From fc221ace26d01267dbdcf3f771533145646ee07d Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 16 Jan 2024 13:18:35 -0800 Subject: [PATCH] [BUG] Materialize Dataframes created from in-memory data (#1780) Half the fix for #582. Will do writes next. **Materialize Dataframes created from in-memory data via from_pydict, from_arrow, from_pandas, etc.** Changes: - Populate Dataframe result_cache and preview before returning from from_tables - Add tests to verify that previews are populated for these calls - Add tests to verify that preview lengths are correct Todo: - If this PR goes through I think we would need to edit the docs as well, e.g. https://www.getdaft.io/projects/docs/en/latest/user_guide/basic_concepts/dataframe_introduction.html#creating-a-dataframe would be outdated --- daft/daft.pyi | 2 + daft/dataframe/dataframe.py | 70 ++++++++++++++++++++++++++-- src/common/daft-config/src/lib.rs | 2 + src/common/daft-config/src/python.rs | 5 ++ tests/conftest.py | 9 +++- tests/dataframe/test_creation.py | 25 ++++++++++ tests/dataframe/test_repr.py | 55 +++++++++++----------- tests/dataframe/test_show.py | 21 ++++++--- tests/ray/test_dask.py | 27 +++++++++++ tests/ray/test_datasets.py | 20 ++++++++ 10 files changed, 198 insertions(+), 38 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index c76fe551c4..32f6c52dfc 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -1149,6 +1149,8 @@ class PyDaftExecutionConfig: def broadcast_join_size_bytes_threshold(self): ... @property def sample_size_for_sort(self): ... + @property + def num_preview_rows(self): ... class PyDaftPlanningConfig: def with_config_values( diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index f4a74a065d..8c47c05ca1 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -285,7 +285,36 @@ def _from_tables(cls, *parts: MicroPartition) -> "DataFrame": builder = LogicalPlanBuilder.from_in_memory_scan( cache_entry, parts[0].schema(), result_pset.num_partitions(), size_bytes ) - return cls(builder) + + df = cls(builder) + df._result_cache = cache_entry + + # build preview + num_preview_rows = context.daft_execution_config.num_preview_rows + dataframe_num_rows = len(df) + if dataframe_num_rows > num_preview_rows: + need = num_preview_rows + preview_parts = [] + for part in parts: + part_len = len(part) + if part_len >= need: # if this part has enough rows, take what we need and break + preview_parts.append(part.slice(0, need)) + break + else: # otherwise, take the whole part and keep going + need -= part_len + preview_parts.append(part) + + preview_results = LocalPartitionSet({i: part for i, part in enumerate(preview_parts)}) + else: + preview_results = result_pset + + # set preview + preview_partition = preview_results._get_merged_vpartition() + df._preview = DataFramePreview( + preview_partition=preview_partition, + dataframe_num_rows=dataframe_num_rows, + ) + return df ### # Write methods @@ -1277,7 +1306,24 @@ def _from_ray_dataset(cls, ds: "RayDataset") -> "DataFrame": num_partitions=partition_set.num_partitions(), size_bytes=size_bytes, ) - return cls(builder) + df = cls(builder) + df._result_cache = cache_entry + + # build preview + num_preview_rows = context.daft_execution_config.num_preview_rows + dataframe_num_rows = len(df) + if dataframe_num_rows > num_preview_rows: + preview_results, _ = ray_runner_io.partition_set_from_ray_dataset(ds.limit(num_preview_rows)) + else: + preview_results = partition_set + + # set preview + preview_partition = preview_results._get_merged_vpartition() + df._preview = DataFramePreview( + preview_partition=preview_partition, + dataframe_num_rows=dataframe_num_rows, + ) + return df @DataframePublicAPI def to_dask_dataframe( @@ -1349,7 +1395,25 @@ def _from_dask_dataframe(cls, ddf: "dask.DataFrame") -> "DataFrame": num_partitions=partition_set.num_partitions(), size_bytes=size_bytes, ) - return cls(builder) + + df = cls(builder) + df._result_cache = cache_entry + + # build preview + num_preview_rows = context.daft_execution_config.num_preview_rows + dataframe_num_rows = len(df) + if dataframe_num_rows > num_preview_rows: + preview_results, _ = ray_runner_io.partition_set_from_dask_dataframe(ddf.loc[: num_preview_rows - 1]) + else: + preview_results = partition_set + + # set preview + preview_partition = preview_results._get_merged_vpartition() + df._preview = DataFramePreview( + preview_partition=preview_partition, + dataframe_num_rows=dataframe_num_rows, + ) + return df @dataclass diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 4f7f61d776..0c1f761644 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -26,6 +26,7 @@ pub struct DaftExecutionConfig { pub merge_scan_tasks_max_size_bytes: usize, pub broadcast_join_size_bytes_threshold: usize, pub sample_size_for_sort: usize, + pub num_preview_rows: usize, } impl Default for DaftExecutionConfig { @@ -35,6 +36,7 @@ impl Default for DaftExecutionConfig { merge_scan_tasks_max_size_bytes: 512 * 1024 * 1024, // 512MB broadcast_join_size_bytes_threshold: 10 * 1024 * 1024, // 10 MiB sample_size_for_sort: 20, + num_preview_rows: 8, } } } diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs index 5d4a168485..89cc6eeebd 100644 --- a/src/common/daft-config/src/python.rs +++ b/src/common/daft-config/src/python.rs @@ -118,6 +118,11 @@ impl PyDaftExecutionConfig { Ok(self.config.sample_size_for_sort) } + #[getter] + fn get_num_preview_rows(&self) -> PyResult { + Ok(self.config.num_preview_rows) + } + fn __reduce__(&self, py: Python) -> PyResult<(PyObject, (Vec,))> { let bin_data = bincode::serialize(self.config.as_ref()) .expect("DaftExecutionConfig should be serializable to bytes"); diff --git a/tests/conftest.py b/tests/conftest.py index 4696998424..0515611cc4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -60,7 +60,12 @@ def uuid_ext_type() -> UuidType: "parquet", ], ) -def make_df(request, tmp_path) -> daft.Dataframe: +def data_source(request): + return request.param + + +@pytest.fixture(scope="function") +def make_df(data_source, tmp_path) -> daft.Dataframe: """Makes a dataframe when provided with data""" def _make_df( @@ -79,7 +84,7 @@ def _make_df( else: raise NotImplementedError(f"make_df not implemented for input type: {type(data)}") - variant = request.param + variant = data_source if variant == "arrow": df = daft.from_arrow(pa_table) if repartition != 1: diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index a3542c9bdd..ebda78eb1f 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -84,6 +84,16 @@ def test_create_dataframe_empty_list() -> None: def test_create_dataframe_list(valid_data: list[dict[str, float]]) -> None: df = daft.from_pylist(valid_data) + assert len(df) == len(valid_data) + assert len(df._preview.preview_partition) == len(valid_data) + assert set(df.column_names) == set(COL_NAMES) + + +def test_create_dataframe_list_data_longer_than_preview(valid_data: list[dict[str, float]]) -> None: + valid_data = valid_data * 3 + df = daft.from_pylist(valid_data) + assert len(df) == len(valid_data) + assert len(df._preview.preview_partition) == 8 assert set(df.column_names) == set(COL_NAMES) @@ -126,6 +136,17 @@ def test_create_dataframe_list_non_dicts() -> None: def test_create_dataframe_pydict(valid_data: list[dict[str, float]]) -> None: pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} df = daft.from_pydict(pydict) + assert len(df) == len(valid_data) + assert len(df._preview.preview_partition) == len(valid_data) + assert set(df.column_names) == set(COL_NAMES) + + +def test_create_dataframe_pydict_data_longer_than_preview(valid_data: list[dict[str, float]]) -> None: + valid_data = valid_data * 3 + pydict = {k: [item[k] for item in valid_data] for k in valid_data[0].keys()} + df = daft.from_pydict(pydict) + assert len(df) == len(valid_data) + assert len(df._preview.preview_partition) == 8 assert set(df.column_names) == set(COL_NAMES) @@ -159,6 +180,8 @@ def test_create_dataframe_arrow(valid_data: list[dict[str, float]], multiple) -> df = daft.from_arrow(t) if multiple: t = pa.concat_tables(t) + assert len(df) == len(t) + assert len(df._preview.preview_partition) == (8 if multiple else len(t)) assert set(df.column_names) == set(t.column_names) casted_field = t.schema.field("variety").with_type(pa.large_string()) expected = t.cast(t.schema.set(t.schema.get_field_index("variety"), casted_field)) @@ -270,6 +293,8 @@ def test_create_dataframe_pandas(valid_data: list[dict[str, float]], multiple) - df = daft.from_pandas(pd_df) if multiple: pd_df = pd.concat(pd_df).reset_index(drop=True) + assert len(df) == len(pd_df) + assert len(df._preview.preview_partition) == (8 if multiple else len(pd_df)) assert set(df.column_names) == set(pd_df.columns) # Check roundtrip. pd.testing.assert_frame_equal(df.to_pandas(), pd_df) diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index d32a07c65d..8b1da5a2d3 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -174,24 +174,10 @@ def test_alias_repr(make_df): ) -def test_repr_with_unicode(make_df): +def test_repr_with_unicode(make_df, data_source): df = make_df({"🔥": [1, 2, 3], "🦁": ["🔥a", "b🔥", "🦁🔥" * 60]}) - - expected_data = {"🔥": ("Int64", []), "🦁": ("Utf8", [])} - assert parse_str_table(df.__repr__(), expected_user_msg_regex=UNMATERIALIZED_REGEX) == expected_data - assert ( - df._repr_html_() - == """
- - -
🔥
Int64
🦁
Utf8
-(No data to display: Dataframe not materialized) -
""" - ) - - df.collect() - - expected_data = { + expected_data_unmaterialized = {"🔥": ("Int64", []), "🦁": ("Utf8", [])} + expected_data_materialized = { "🔥": ( "Int64", ["1", "2", "3"], @@ -201,14 +187,15 @@ def test_repr_with_unicode(make_df): ["🔥a", "b🔥", "🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁🔥🦁…"], ), } - expected_data_html = { - **expected_data, - } + string_array = ["🔥a", "b🔥", "🦁🔥" * 60] # we dont truncate for html - assert parse_str_table(df.__repr__()) == expected_data - assert ( - df._repr_html_() - == f"""
+ expected_html_unmaterialized = """
+ + +
🔥
Int64
🦁
Utf8
+(No data to display: Dataframe not materialized) +
""" + expected_html_materialized = f"""
@@ -219,12 +206,27 @@ def test_repr_with_unicode(make_df):
🔥
Int64
🦁
Utf8
(Showing first 3 of 3 rows)
""" - ) + + variant = data_source + if variant == "parquet": + assert ( + parse_str_table(df.__repr__(), expected_user_msg_regex=UNMATERIALIZED_REGEX) == expected_data_unmaterialized + ) + assert df._repr_html_() == expected_html_unmaterialized + elif variant == "arrow": + assert ( + parse_str_table(df.__repr__(), expected_user_msg_regex=SHOWING_N_ROWS_REGEX) == expected_data_materialized + ) + assert df._repr_html_() == expected_html_materialized + + df.collect() + + assert parse_str_table(df.__repr__()) == expected_data_materialized + assert df._repr_html_() == expected_html_materialized def test_repr_with_html_string(): df = daft.from_pydict({"A": [f"
body{i}
" for i in range(3)]}) - df.collect() non_html_table = df.__repr__() html_table = df._repr_html_() @@ -249,7 +251,6 @@ def test_repr_html_custom_hooks(): "pil": daft.Series.from_pylist([img for _ in range(3)], pyobj="force"), } ) - df.collect() assert ( ANSI_ESCAPE.sub("", df.__repr__()).replace("\r", "") diff --git a/tests/dataframe/test_show.py b/tests/dataframe/test_show.py index 49389c131c..df32865551 100644 --- a/tests/dataframe/test_show.py +++ b/tests/dataframe/test_show.py @@ -11,14 +11,18 @@ def test_show_default(make_df, valid_data): assert df_display.num_rows == 3 -def test_show_some(make_df, valid_data): +def test_show_some(make_df, valid_data, data_source): df = make_df(valid_data) df_display = df._construct_show_display(1) assert df_display.schema == df.schema() assert len(df_display.preview.preview_partition) == 1 - # Limit is less than DataFrame length, so we don't know the full DataFrame length. - assert df_display.preview.dataframe_num_rows is None + # Limit is less than DataFrame length, so we only know full DataFrame length if it was loaded from memory, e.g. arrow. + variant = data_source + if variant == "parquet": + assert df_display.preview.dataframe_num_rows is None + elif variant == "arrow": + assert df_display.preview.dataframe_num_rows == len(valid_data) assert df_display.num_rows == 1 @@ -48,14 +52,19 @@ def test_show_from_cached_collect_prefix(make_df, valid_data): assert df_display.num_rows == 2 -def test_show_not_from_cached_collect(make_df, valid_data): +def test_show_not_from_cached_collect(make_df, valid_data, data_source): df = make_df(valid_data) df = df.collect(2) collected_preview = df._preview df_display = df._construct_show_display(8) - # Check that cached preview from df.collect() was NOT used, since it didn't have enough rows. - assert df_display.preview != collected_preview + variant = data_source + if variant == "parquet": + # Cached preview from df.collect() is NOT USED because data was not materialized from parquet. + assert df_display.preview != collected_preview + elif variant == "arrow": + # Cached preview from df.collect() is USED because data was materialized from arrow. + assert df_display.preview == collected_preview assert df_display.schema == df.schema() assert len(df_display.preview.preview_partition) == len(valid_data) assert df_display.preview.dataframe_num_rows == 3 diff --git a/tests/ray/test_dask.py b/tests/ray/test_dask.py index 6b8b312243..e33c8173d6 100644 --- a/tests/ray/test_dask.py +++ b/tests/ray/test_dask.py @@ -144,3 +144,30 @@ def test_from_dask_dataframe_tensor(n_partitions: int): daft_df = daft.from_dask_dataframe(ddf) out_df = daft_df.to_pandas() pd.testing.assert_frame_equal(out_df, df) + + +@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner") +@pytest.mark.parametrize("n_partitions", [1, 2]) +def test_from_dask_dataframe_preview(n_partitions: int): + df = pd.DataFrame(DATA) + ddf = dd.from_pandas(df, npartitions=n_partitions) + + daft_df = daft.from_dask_dataframe(ddf) + assert len(daft_df) == 3 + assert len(daft_df._preview.preview_partition) == 3 + + +@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner") +@pytest.mark.parametrize("n_partitions", [1, 2]) +def test_from_dask_dataframe_data_longer_than_preview(n_partitions: int): + df = pd.DataFrame( + { + "intcol": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "strcol": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"], + } + ) + ddf = dd.from_pandas(df, npartitions=n_partitions) + + daft_df = daft.from_dask_dataframe(ddf) + assert len(daft_df) == 10 + assert len(daft_df._preview.preview_partition) == 8 diff --git a/tests/ray/test_datasets.py b/tests/ray/test_datasets.py index 77edd2383c..b385771a50 100644 --- a/tests/ray/test_datasets.py +++ b/tests/ray/test_datasets.py @@ -240,3 +240,23 @@ def add_float(df: pd.DataFrame) -> pd.DataFrame: df = daft.from_ray_dataset(ds) expected_df = add_float(pd_df) pd.testing.assert_frame_equal(df.to_pandas(), expected_df) + + +@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner") +@pytest.mark.parametrize("n_partitions", [1, 2]) +def test_from_ray_dataset_preview(n_partitions: int): + ds = ray.data.range(3, parallelism=n_partitions) + + df = daft.from_ray_dataset(ds) + assert len(df) == 3 + assert len(df._preview.preview_partition) == 3 + + +@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner") +@pytest.mark.parametrize("n_partitions", [1, 2]) +def test_from_ray_dataset_data_longer_than_preview(n_partitions: int): + ds = ray.data.range(10, parallelism=n_partitions) + + df = daft.from_ray_dataset(ds) + assert len(df) == 10 + assert len(df._preview.preview_partition) == 8