From 007425f53e38875358fa923b1e5137eddf97f9e7 Mon Sep 17 00:00:00 2001 From: clarkzinzow Date: Wed, 29 Nov 2023 12:06:41 -0800 Subject: [PATCH] Fix tests and style. --- daft/logical/schema.py | 3 +- daft/table/table_io.py | 2 +- src/daft-json/test/iris_tiny.jsonl | 2 +- .../test/iris_tiny_all_null_column.jsonl | 2 +- .../test/iris_tiny_conflicting_dtypes.jsonl | 2 +- src/daft-json/test/iris_tiny_nulls.jsonl | 2 +- src/daft-micropartition/Cargo.toml | 2 +- src/daft-plan/src/optimization/optimizer.rs | 8 +++-- .../optimization/rules/drop_repartition.rs | 2 +- .../optimization/rules/push_down_filter.rs | 30 +++++++++---------- .../src/optimization/rules/push_down_limit.rs | 23 +++++++------- .../rules/push_down_projection.rs | 14 ++++----- src/daft-plan/src/test/mod.rs | 16 ++++------ tests/table/table_io/test_json.py | 5 ++-- 14 files changed, 55 insertions(+), 58 deletions(-) diff --git a/daft/logical/schema.py b/daft/logical/schema.py index 99bff58da5..c2533df7b4 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -3,8 +3,7 @@ import sys from typing import TYPE_CHECKING, Iterator -from daft.daft import CsvParseOptions -from daft.daft import JsonParseOptions +from daft.daft import CsvParseOptions, JsonParseOptions from daft.daft import PyField as _PyField from daft.daft import PySchema as _PySchema from daft.daft import read_csv_schema as _read_csv_schema diff --git a/daft/table/table_io.py b/daft/table/table_io.py index d623079ba9..e587bf5778 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -16,10 +16,10 @@ CsvConvertOptions, CsvParseOptions, CsvReadOptions, + IOConfig, JsonConvertOptions, JsonParseOptions, JsonReadOptions, - IOConfig, NativeStorageConfig, PythonStorageConfig, StorageConfig, diff --git a/src/daft-json/test/iris_tiny.jsonl b/src/daft-json/test/iris_tiny.jsonl index 84f3a08f99..99926cc27d 100644 --- a/src/daft-json/test/iris_tiny.jsonl +++ b/src/daft-json/test/iris_tiny.jsonl @@ -17,4 +17,4 @@ {"sepalLength": 5.4, "sepalWidth": 3.9, "petalLength": 1.3, "petalWidth": 0.4, "species": "setosa"} {"sepalLength": 5.1, "sepalWidth": 3.5, "petalLength": 1.4, "petalWidth": 0.3, "species": "setosa"} {"sepalLength": 5.7, "sepalWidth": 3.8, "petalLength": 1.7, "petalWidth": 0.3, "species": "setosa"} -{"sepalLength": 5.1, "sepalWidth": 3.8, "petalLength": 1.5, "petalWidth": 0.3, "species": "setosa"} \ No newline at end of file +{"sepalLength": 5.1, "sepalWidth": 3.8, "petalLength": 1.5, "petalWidth": 0.3, "species": "setosa"} diff --git a/src/daft-json/test/iris_tiny_all_null_column.jsonl b/src/daft-json/test/iris_tiny_all_null_column.jsonl index 27588f62f9..3e73b3ef6a 100644 --- a/src/daft-json/test/iris_tiny_all_null_column.jsonl +++ b/src/daft-json/test/iris_tiny_all_null_column.jsonl @@ -3,4 +3,4 @@ {"sepalLength": 4.7, "sepalWidth": 3.2, "petalLength": null, "petalWidth": 0.2, "species": "setosa"} {"sepalLength": 4.6, "sepalWidth": 3.1, "petalLength": null, "petalWidth": 0.2, "species": "setosa"} {"sepalLength": 5.0, "sepalWidth": 3.6, "petalLength": null, "petalWidth": 0.2, "species": "setosa"} -{"sepalLength": 5.4, "sepalWidth": 3.9, "petalLength": null, "petalWidth": 0.4, "species": "setosa"} \ No newline at end of file +{"sepalLength": 5.4, "sepalWidth": 3.9, "petalLength": null, "petalWidth": 0.4, "species": "setosa"} diff --git a/src/daft-json/test/iris_tiny_conflicting_dtypes.jsonl b/src/daft-json/test/iris_tiny_conflicting_dtypes.jsonl index 1fa63b0003..351361be62 100644 --- a/src/daft-json/test/iris_tiny_conflicting_dtypes.jsonl +++ b/src/daft-json/test/iris_tiny_conflicting_dtypes.jsonl @@ -1,2 +1,2 @@ {"sepalLength": 5.1, "sepalWidth": false, "petalLength": 3, "petalWidth": 3, "species": "setosa"} -{"sepalLength": "foo", "sepalWidth": 3.0, "petalLength": "bar", "petalWidth": 0.2, "species": false} \ No newline at end of file +{"sepalLength": "foo", "sepalWidth": 3.0, "petalLength": "bar", "petalWidth": 0.2, "species": false} diff --git a/src/daft-json/test/iris_tiny_nulls.jsonl b/src/daft-json/test/iris_tiny_nulls.jsonl index 63220fcce0..71dd3eb231 100644 --- a/src/daft-json/test/iris_tiny_nulls.jsonl +++ b/src/daft-json/test/iris_tiny_nulls.jsonl @@ -3,4 +3,4 @@ {"sepalLength": 4.7, "sepalWidth": 3.2, "petalLength": null, "petalWidth": 0.2, "species": "setosa"} {"sepalLength": 4.6, "sepalWidth": 3.1, "petalLength": 1.5, "petalWidth": null, "species": "setosa"} {"sepalLength": 5.0, "sepalWidth": 3.6, "petalLength": 1.4, "petalWidth": 0.2, "species": null} -{"sepalLength": null, "sepalWidth": null, "petalLength": null, "petalWidth": null, "species": null} \ No newline at end of file +{"sepalLength": null, "sepalWidth": null, "petalLength": null, "petalWidth": null, "species": null} diff --git a/src/daft-micropartition/Cargo.toml b/src/daft-micropartition/Cargo.toml index f048544f14..5c7e026dc2 100644 --- a/src/daft-micropartition/Cargo.toml +++ b/src/daft-micropartition/Cargo.toml @@ -5,8 +5,8 @@ common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-csv = {path = "../daft-csv", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} -daft-json = {path = "../daft-json", default-features = false} daft-io = {path = "../daft-io", default-features = false} +daft-json = {path = "../daft-json", default-features = false} daft-parquet = {path = "../daft-parquet", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} daft-stats = {path = "../daft-stats", default-features = false} diff --git a/src/daft-plan/src/optimization/optimizer.rs b/src/daft-plan/src/optimization/optimizer.rs index d020c0eb99..72670e77b1 100644 --- a/src/daft-plan/src/optimization/optimizer.rs +++ b/src/daft-plan/src/optimization/optimizer.rs @@ -515,14 +515,16 @@ mod tests { ], OptimizerConfig::new(20), ); + let fields = vec![Field::new("a", DataType::Int64)]; let proj_exprs = vec![ col("a") + lit(1), (col("a") + lit(2)).alias("b"), (col("a") + lit(3)).alias("c"), ]; - let plan = dummy_scan_node(vec![Field::new("a", DataType::Int64)]) + let filter_predicate = col("a").lt(&lit(2)); + let plan = dummy_scan_node(fields.clone()) .project(proj_exprs, Default::default())? - .filter(col("a").lt(&lit(2)))? + .filter(filter_predicate)? .build(); let mut pass_count = 0; let mut did_transform = false; @@ -536,7 +538,7 @@ mod tests { let expected = "\ Filter: [[[col(a) < lit(2)] | lit(false)] | lit(false)] & lit(true)\ \n Project: col(a) + lit(3) AS c, col(a) + lit(1), col(a) + lit(2) AS b\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64)"; assert_eq!(opt_plan.repr_indent(), expected); Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/drop_repartition.rs b/src/daft-plan/src/optimization/rules/drop_repartition.rs index 02db0ffb1f..0593151c07 100644 --- a/src/daft-plan/src/optimization/rules/drop_repartition.rs +++ b/src/daft-plan/src/optimization/rules/drop_repartition.rs @@ -98,7 +98,7 @@ mod tests { .build(); let expected = "\ Repartition: Scheme = Hash, Number of partitions = 5, Partition by = col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/push_down_filter.rs b/src/daft-plan/src/optimization/rules/push_down_filter.rs index 0bd738e574..dbe7c72102 100644 --- a/src/daft-plan/src/optimization/rules/push_down_filter.rs +++ b/src/daft-plan/src/optimization/rules/push_down_filter.rs @@ -261,7 +261,7 @@ mod tests { .build(); let expected = "\ Filter: [col(b) == lit(\"foo\")] & [col(a) < lit(2)]\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -279,7 +279,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -297,7 +297,7 @@ mod tests { let expected = "\ Project: col(a), col(b)\ \n Filter: [col(a) < lit(2)] & [col(b) == lit(\"foo\")]\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -317,7 +317,7 @@ mod tests { let expected = "\ Filter: col(a) < lit(2)\ \n Project: col(a) + lit(1)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -338,7 +338,7 @@ mod tests { let expected = "\ Project: col(a) + lit(1)\ \n Filter: [col(a) + lit(1)] < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -356,7 +356,7 @@ mod tests { let expected = "\ Sort: Sort by = (col(a), descending)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; // TODO(Clark): For tests in which we only care about reordering of operators, maybe switch to a form that leverages the single-node display? // let expected = format!("{sort}\n {filter}\n {source}"); assert_optimized_plan_eq(plan, expected)?; @@ -376,7 +376,7 @@ mod tests { let expected = "\ Repartition: Scheme = Hash, Number of partitions = 1, Partition by = col(a)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -395,9 +395,9 @@ mod tests { let expected = "\ Concat\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -423,8 +423,8 @@ mod tests { let expected = "\ Join: Type = Inner, On = col(b), Output schema = a (Int64), b (Utf8), c (Float64)\ \n Filter: col(a) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ - \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Utf8), c (Float64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ + \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Utf8), c (Float64)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -449,9 +449,9 @@ mod tests { .build(); let expected = "\ Join: Type = Inner, On = col(b), Output schema = a (Int64), b (Utf8), c (Float64)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)\ \n Filter: col(c) < lit(2.0)\ - \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Utf8), c (Float64)"; + \n Source: Json, File paths = [/foo], File schema = b (Utf8), c (Float64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Utf8), c (Float64)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -475,9 +475,9 @@ mod tests { let expected = "\ Join: Type = Inner, On = col(b), Output schema = a (Int64), b (Int64), c (Float64)\ \n Filter: col(b) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Float64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64), c (Float64)\ + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Float64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64), c (Float64)\ \n Filter: col(b) < lit(2)\ - \n Source: Json, File paths = [/foo], File schema = b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = b (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = b (Int64)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/push_down_limit.rs b/src/daft-plan/src/optimization/rules/push_down_limit.rs index 457dcbe830..a9706db4f1 100644 --- a/src/daft-plan/src/optimization/rules/push_down_limit.rs +++ b/src/daft-plan/src/optimization/rules/push_down_limit.rs @@ -87,6 +87,7 @@ mod tests { use common_error::DaftResult; use daft_core::{datatypes::Field, schema::Schema, DataType}; use daft_dsl::col; + use daft_scan::Pushdowns; use std::sync::Arc; #[cfg(feature = "python")] @@ -98,7 +99,7 @@ mod tests { rules::PushDownLimit, Optimizer, }, - test::{dummy_scan_node, dummy_scan_node_with_limit}, + test::{dummy_scan_node, dummy_scan_node_with_pushdowns}, LogicalPlan, PartitionScheme, }; @@ -139,7 +140,7 @@ mod tests { .build(); let expected = "\ Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -149,18 +150,18 @@ mod tests { /// Limit-Source[existing_limit] -> Source[existing_limit] #[test] fn limit_does_not_push_into_external_source_if_smaller_limit() -> DaftResult<()> { - let plan = dummy_scan_node_with_limit( + let plan = dummy_scan_node_with_pushdowns( vec![ Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ], - Some(3), + Pushdowns::default().with_limit(Some(3)), ) .limit(5, false)? .build(); let expected = "\ Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 3, Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 3, Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -170,18 +171,18 @@ mod tests { /// Limit-Source[existing_limit] -> Source[new_limit] #[test] fn limit_does_push_into_external_source_if_larger_limit() -> DaftResult<()> { - let plan = dummy_scan_node_with_limit( + let plan = dummy_scan_node_with_pushdowns( vec![ Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ], - Some(10), + Pushdowns::default().with_limit(Some(10)), ) .limit(5, false)? .build(); let expected = "\ Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -199,7 +200,7 @@ mod tests { .build(); let expected = "\ Limit: 5\ - \n . Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; + \n . Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -219,7 +220,7 @@ mod tests { let expected = "\ Repartition: Scheme = Hash, Number of partitions = 1, Partition by = col(a)\ \n Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } @@ -239,7 +240,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Limit: 5\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Utf8), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Limit pushdown = 5, Output schema = a (Int64), b (Utf8)"; assert_optimized_plan_eq(plan, expected)?; Ok(()) } diff --git a/src/daft-plan/src/optimization/rules/push_down_projection.rs b/src/daft-plan/src/optimization/rules/push_down_projection.rs index edd04dab16..b8af4b559f 100644 --- a/src/daft-plan/src/optimization/rules/push_down_projection.rs +++ b/src/daft-plan/src/optimization/rules/push_down_projection.rs @@ -573,7 +573,7 @@ mod tests { let expected = "\ Project: [col(a) + lit(1)] + lit(3), col(b) + lit(2), col(a) + lit(4)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) } @@ -589,7 +589,7 @@ mod tests { .build(); let expected = "\ - Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; + Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -606,7 +606,7 @@ mod tests { let expected = "\ Project: col(b), col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -624,7 +624,7 @@ mod tests { let expected = "\ Project: col(b) + lit(3)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Projection pushdown = [b], Output schema = b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Projection pushdown = [b], Output schema = b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -650,7 +650,7 @@ mod tests { let expected = "\ Project: col(a), col(b), col(b) AS c\ \n Project: col(b) + lit(3), col(a)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Output schema = a (Int64), b (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -671,7 +671,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Aggregation: mean(col(a)), Group by = col(c), Output schema = c (Int64), a (Float64)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Projection pushdown = [a, c], Output schema = a (Int64), c (Int64)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Int64), c (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Projection pushdown = [a, c], Output schema = a (Int64), c (Int64)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) @@ -692,7 +692,7 @@ mod tests { let expected = "\ Project: col(a)\ \n Filter: col(b)\ - \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Boolean), c (Int64), Format-specific config = Json(JsonSourceConfig), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Projection pushdown = [a, b], Output schema = a (Int64), b (Boolean)"; + \n Source: Json, File paths = [/foo], File schema = a (Int64), b (Boolean), c (Int64), Format-specific config = Json(JsonSourceConfig { buffer_size: None, chunk_size: None }), Storage config = Native(NativeStorageConfig { io_config: None, multithreaded_io: true }), Projection pushdown = [a, b], Output schema = a (Int64), b (Boolean)"; assert_optimized_plan_eq(unoptimized, expected)?; Ok(()) diff --git a/src/daft-plan/src/test/mod.rs b/src/daft-plan/src/test/mod.rs index ae652b3c6e..1039f84ea1 100644 --- a/src/daft-plan/src/test/mod.rs +++ b/src/daft-plan/src/test/mod.rs @@ -7,25 +7,21 @@ use crate::{builder::LogicalPlanBuilder, source_info::FileInfos, NativeStorageCo /// Create a dummy scan node containing the provided fields in its schema. pub fn dummy_scan_node(fields: Vec) -> LogicalPlanBuilder { - let schema = Arc::new(Schema::new(fields).unwrap()); - LogicalPlanBuilder::table_scan( - FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]), - schema, - FileFormatConfig::Json(Default::default()).into(), - StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(), - ) - .unwrap() + dummy_scan_node_with_pushdowns(fields, Default::default()) } /// Create a dummy scan node containing the provided fields in its schema and the provided limit. -pub fn dummy_scan_node_with_limit(fields: Vec, limit: Option) -> LogicalPlanBuilder { +pub fn dummy_scan_node_with_pushdowns( + fields: Vec, + pushdowns: Pushdowns, +) -> LogicalPlanBuilder { let schema = Arc::new(Schema::new(fields).unwrap()); LogicalPlanBuilder::table_scan_with_pushdowns( FileInfos::new_internal(vec!["/foo".to_string()], vec![None], vec![None]), schema, FileFormatConfig::Json(Default::default()).into(), StorageConfig::Native(NativeStorageConfig::new_internal(true, None).into()).into(), - Pushdowns::new(None, None, limit), + pushdowns, ) .unwrap() } diff --git a/tests/table/table_io/test_json.py b/tests/table/table_io/test_json.py index 761b10656b..b6bdf88608 100644 --- a/tests/table/table_io/test_json.py +++ b/tests/table/table_io/test_json.py @@ -1,17 +1,16 @@ from __future__ import annotations -import contextlib -import io +import contextlib import json import os import pathlib import tempfile from typing import Any -from daft.daft import NativeStorageConfig, PythonStorageConfig, StorageConfig import pytest import daft +from daft.daft import NativeStorageConfig, PythonStorageConfig, StorageConfig from daft.datatype import DataType from daft.logical.schema import Schema from daft.runners.partitioning import TableReadOptions