From 1d81c4d4652f66e4b57f1190017600c6a887ecc7 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 6 Feb 2024 22:08:06 +0800 Subject: [PATCH 1/5] add fix --- src/daft-micropartition/src/micropartition.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index ef3da4da43..ea98cb15fb 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -187,7 +187,7 @@ fn materialize_scan_task( column_names .as_ref() .map(|cols| cols.iter().map(|col| col.to_string()).collect()), - None, + Some(scan_task.schema.clone()), scan_task.pushdowns.filters.clone(), ); let parse_options = JsonParseOptions::new_internal(); From e58d3ef588a1e5e286e305690573329c943e1af2 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 7 Feb 2024 18:42:49 +0800 Subject: [PATCH 2/5] unit test --- tests/dataframe/test_creation.py | 35 ++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index ed93d18451..c178f01864 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -797,6 +797,41 @@ def test_create_dataframe_json_schema_hints_ignore_random_hint(valid_data: list[ assert len(pd_df) == len(valid_data) +def test_create_dataframe_json_schema_hints_large_file() -> None: + # First assemble data that will be larger than 1MB, because our schema inference will max out at 1MB. + item = {"column": {"test_key": "test_value"}} + item_size = len(json.dumps(item).encode("utf-8")) + entries_needed = (1 * 1024 * 1024) // item_size + 1 + data = [item] * entries_needed + + # Add a row at the end of the file with a different key to ensure that the schema inference doesn't pick it up + data.append({"column": {"TEST_KEY_BOTTOM_OF_FILE": "TEST_VALUE_BOTTOM_OF_FILE"}}) + + with create_temp_filename() as fname: + with open(fname, "w") as f: + for row in data: + f.write(json.dumps(row)) + f.write("\n") + f.flush() + + df = daft.read_json( + fname, + schema_hints={ + "column": DataType.struct({"test_key": DataType.string(), "TEST_KEY_BOTTOM_OF_FILE": DataType.string()}) + }, + ) + assert df.schema()["column"].dtype == DataType.struct( + {"test_key": DataType.string(), "TEST_KEY_BOTTOM_OF_FILE": DataType.string()} + ) + + # When dataframe is materialized, the schema hints should be enforced and the key value pair at the bottom should not be null + df = df.select(df["column"].struct.get("TEST_KEY_BOTTOM_OF_FILE")) + df = df.where(df["TEST_KEY_BOTTOM_OF_FILE"].not_null()).collect() + + assert len(df) == 1 + assert df.to_pydict()["TEST_KEY_BOTTOM_OF_FILE"][0] == "TEST_VALUE_BOTTOM_OF_FILE" + + @pytest.mark.parametrize( "input,expected", [ From 3547bc2538d87dc776991971372dd7a6a02ed3d4 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 7 Feb 2024 18:45:54 +0800 Subject: [PATCH 3/5] unit test edit --- tests/dataframe/test_creation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index c178f01864..e0209bd7d9 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -814,6 +814,10 @@ def test_create_dataframe_json_schema_hints_large_file() -> None: f.write("\n") f.flush() + # Without schema hints, schema inference should not pick up the key value pair at the bottom of the file + assert daft.read_json(fname).schema()["column"].dtype == DataType.struct({"test_key": DataType.string()}) + + # With schema hints, the bottom kv pair should be included df = daft.read_json( fname, schema_hints={ From 9b2d18f01014d30197b22a45c16e12a9ca4bcdfa Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 7 Feb 2024 21:47:53 +0800 Subject: [PATCH 4/5] remove 1 * --- tests/dataframe/test_creation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index e0209bd7d9..5668be4af2 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -801,7 +801,7 @@ def test_create_dataframe_json_schema_hints_large_file() -> None: # First assemble data that will be larger than 1MB, because our schema inference will max out at 1MB. item = {"column": {"test_key": "test_value"}} item_size = len(json.dumps(item).encode("utf-8")) - entries_needed = (1 * 1024 * 1024) // item_size + 1 + entries_needed = (1024 * 1024) // item_size + 1 data = [item] * entries_needed # Add a row at the end of the file with a different key to ensure that the schema inference doesn't pick it up From c0116e5c34b22f91b1b9fe6082c9ba836b7784ff Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Fri, 9 Feb 2024 13:59:15 +0800 Subject: [PATCH 5/5] simplify test --- tests/dataframe/test_creation.py | 47 +++++++++++++------------------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index 5668be4af2..7d92478ecd 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -797,43 +797,34 @@ def test_create_dataframe_json_schema_hints_ignore_random_hint(valid_data: list[ assert len(pd_df) == len(valid_data) -def test_create_dataframe_json_schema_hints_large_file() -> None: - # First assemble data that will be larger than 1MB, because our schema inference will max out at 1MB. - item = {"column": {"test_key": "test_value"}} - item_size = len(json.dumps(item).encode("utf-8")) - entries_needed = (1024 * 1024) // item_size + 1 - data = [item] * entries_needed - - # Add a row at the end of the file with a different key to ensure that the schema inference doesn't pick it up - data.append({"column": {"TEST_KEY_BOTTOM_OF_FILE": "TEST_VALUE_BOTTOM_OF_FILE"}}) - - with create_temp_filename() as fname: +def test_create_dataframe_json_schema_hints_two_files() -> None: + with create_temp_filename() as fname, create_temp_filename() as fname2: with open(fname, "w") as f: - for row in data: - f.write(json.dumps(row)) - f.write("\n") + f.write(json.dumps({"foo": {"bar": "baz"}})) + f.write("\n") + f.flush() + + with open(fname2, "w") as f: + f.write(json.dumps({"foo": {"bar2": "baz2"}})) + f.write("\n") f.flush() - # Without schema hints, schema inference should not pick up the key value pair at the bottom of the file - assert daft.read_json(fname).schema()["column"].dtype == DataType.struct({"test_key": DataType.string()}) + # Without schema hints, schema inference should not pick up bar2 + assert daft.read_json([fname, fname2]).schema()["foo"].dtype == DataType.struct({"bar": DataType.string()}) - # With schema hints, the bottom kv pair should be included + # With schema hints, bar2 should be included df = daft.read_json( - fname, - schema_hints={ - "column": DataType.struct({"test_key": DataType.string(), "TEST_KEY_BOTTOM_OF_FILE": DataType.string()}) - }, - ) - assert df.schema()["column"].dtype == DataType.struct( - {"test_key": DataType.string(), "TEST_KEY_BOTTOM_OF_FILE": DataType.string()} + [fname, fname2], + schema_hints={"foo": DataType.struct({"bar": DataType.string(), "bar2": DataType.string()})}, ) + assert df.schema()["foo"].dtype == DataType.struct({"bar": DataType.string(), "bar2": DataType.string()}) - # When dataframe is materialized, the schema hints should be enforced and the key value pair at the bottom should not be null - df = df.select(df["column"].struct.get("TEST_KEY_BOTTOM_OF_FILE")) - df = df.where(df["TEST_KEY_BOTTOM_OF_FILE"].not_null()).collect() + # When dataframe is materialized, the schema hints should be enforced and bar2 should be included + df = df.select(df["foo"].struct.get("bar2")) + df = df.where(df["bar2"].not_null()).collect() assert len(df) == 1 - assert df.to_pydict()["TEST_KEY_BOTTOM_OF_FILE"][0] == "TEST_VALUE_BOTTOM_OF_FILE" + assert df.to_pydict()["bar2"][0] == "baz2" @pytest.mark.parametrize(