From 96c538b21483f8bbd1b34edc2219022fef2c4149 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Mon, 4 Nov 2024 16:57:18 -0800 Subject: [PATCH] [CHORE] Swordfish specific test fixtures (#3164) This PR sets up a few swordfish related test fixtures, specifically: - Parameterize `default_morsel_size = [1, None]` for dataframe tests that do any into/repartitioning. This is to make sure that the operator parallelism is working. - Setup iteration tests in `test_iter.py` - Makes the ordering assertions stricter on some tests. E.g. some tests do `assert df.sort(col) == expected`, but there are other columns in df that may not be sorted, and this won't be enough if morsel_size = 1. This isn't a problem with swordfish but the test, where the sort should actually involve more columns. Big note: There was a problem with pivot not getting applied correctly. This is because a dataframe pivot operation comprises of an agg + the actual pivoting, but previously the pivot was implemented as an intermediate operator, and the results of the agg were getting buffered. In order for the pivot to work it has to receive all values with the same group_by keys. This PR implements simplifies Pivot as a BlockingSink, so all the work is in there. --------- Co-authored-by: Colin Ho --- .../src/intermediate_ops/mod.rs | 1 - .../src/intermediate_ops/pivot.rs | 57 -------- src/daft-local-execution/src/pipeline.rs | 13 +- src/daft-local-execution/src/sinks/mod.rs | 1 + src/daft-local-execution/src/sinks/pivot.rs | 126 ++++++++++++++++++ src/daft-physical-plan/src/local_plan.rs | 3 + src/daft-physical-plan/src/translate.rs | 23 +--- tests/conftest.py | 11 ++ tests/cookbook/conftest.py | 5 +- tests/cookbook/test_aggregations.py | 34 ++--- tests/cookbook/test_computations.py | 10 +- tests/cookbook/test_count_rows.py | 8 +- tests/cookbook/test_dataloading.py | 2 +- tests/cookbook/test_distinct.py | 2 +- tests/cookbook/test_filter.py | 6 +- tests/cookbook/test_image.py | 6 +- tests/cookbook/test_joins.py | 12 +- tests/cookbook/test_pandas_cookbook.py | 28 ++-- tests/cookbook/test_sorting.py | 14 +- tests/cookbook/test_write.py | 36 ++--- tests/dataframe/test_aggregations.py | 28 ++-- tests/dataframe/test_approx_count_distinct.py | 6 +- tests/dataframe/test_concat.py | 6 +- tests/dataframe/test_distinct.py | 6 +- tests/dataframe/test_iter.py | 97 +++++++------- tests/dataframe/test_joins.py | 56 ++++---- tests/dataframe/test_map_groups.py | 12 +- tests/dataframe/test_pivot.py | 16 +-- tests/dataframe/test_sort.py | 20 +-- tests/dataframe/test_stddev.py | 8 +- tests/dataframe/test_unpivot.py | 40 +++--- 31 files changed, 391 insertions(+), 302 deletions(-) delete mode 100644 src/daft-local-execution/src/intermediate_ops/pivot.rs create mode 100644 src/daft-local-execution/src/sinks/pivot.rs diff --git a/src/daft-local-execution/src/intermediate_ops/mod.rs b/src/daft-local-execution/src/intermediate_ops/mod.rs index 336f6ebc92..f512343e96 100644 --- a/src/daft-local-execution/src/intermediate_ops/mod.rs +++ b/src/daft-local-execution/src/intermediate_ops/mod.rs @@ -5,7 +5,6 @@ pub mod explode; pub mod filter; pub mod inner_hash_join_probe; pub mod intermediate_op; -pub mod pivot; pub mod project; pub mod sample; pub mod unpivot; diff --git a/src/daft-local-execution/src/intermediate_ops/pivot.rs b/src/daft-local-execution/src/intermediate_ops/pivot.rs deleted file mode 100644 index afac5f9b02..0000000000 --- a/src/daft-local-execution/src/intermediate_ops/pivot.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::sync::Arc; - -use common_error::DaftResult; -use daft_dsl::ExprRef; -use tracing::instrument; - -use super::intermediate_op::{ - IntermediateOperator, IntermediateOperatorResult, IntermediateOperatorState, -}; -use crate::pipeline::PipelineResultType; - -pub struct PivotOperator { - group_by: Vec, - pivot_col: ExprRef, - values_col: ExprRef, - names: Vec, -} - -impl PivotOperator { - pub fn new( - group_by: Vec, - pivot_col: ExprRef, - values_col: ExprRef, - names: Vec, - ) -> Self { - Self { - group_by, - pivot_col, - values_col, - names, - } - } -} - -impl IntermediateOperator for PivotOperator { - #[instrument(skip_all, name = "PivotOperator::execute")] - fn execute( - &self, - _idx: usize, - input: &PipelineResultType, - _state: &IntermediateOperatorState, - ) -> DaftResult { - let out = input.as_data().pivot( - &self.group_by, - self.pivot_col.clone(), - self.values_col.clone(), - self.names.clone(), - )?; - Ok(IntermediateOperatorResult::NeedMoreInput(Some(Arc::new( - out, - )))) - } - - fn name(&self) -> &'static str { - "PivotOperator" - } -} diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index b371af0c8a..f15ab50543 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -28,8 +28,8 @@ use crate::{ actor_pool_project::ActorPoolProjectOperator, aggregate::AggregateOperator, anti_semi_hash_join_probe::AntiSemiProbeOperator, explode::ExplodeOperator, filter::FilterOperator, inner_hash_join_probe::InnerHashJoinProbeOperator, - intermediate_op::IntermediateNode, pivot::PivotOperator, project::ProjectOperator, - sample::SampleOperator, unpivot::UnpivotOperator, + intermediate_op::IntermediateNode, project::ProjectOperator, sample::SampleOperator, + unpivot::UnpivotOperator, }, sinks::{ aggregate::AggregateSink, @@ -38,6 +38,7 @@ use crate::{ hash_join_build::HashJoinBuildSink, limit::LimitSink, outer_hash_join_probe::OuterHashJoinProbeSink, + pivot::PivotSink, sort::SortSink, streaming_sink::StreamingSinkNode, write::{WriteFormat, WriteSink}, @@ -282,17 +283,19 @@ pub fn physical_plan_to_pipeline( group_by, pivot_column, value_column, + aggregation, names, .. }) => { - let pivot_op = PivotOperator::new( + let child_node = physical_plan_to_pipeline(input, psets, cfg)?; + let pivot_sink = PivotSink::new( group_by.clone(), pivot_column.clone(), value_column.clone(), + aggregation.clone(), names.clone(), ); - let child_node = physical_plan_to_pipeline(input, psets, cfg)?; - IntermediateNode::new(Arc::new(pivot_op), vec![child_node]).boxed() + BlockingSinkNode::new(Arc::new(pivot_sink), child_node).boxed() } LocalPhysicalPlan::Sort(Sort { input, diff --git a/src/daft-local-execution/src/sinks/mod.rs b/src/daft-local-execution/src/sinks/mod.rs index 64366385c3..768427ce62 100644 --- a/src/daft-local-execution/src/sinks/mod.rs +++ b/src/daft-local-execution/src/sinks/mod.rs @@ -4,6 +4,7 @@ pub mod concat; pub mod hash_join_build; pub mod limit; pub mod outer_hash_join_probe; +pub mod pivot; pub mod sort; pub mod streaming_sink; pub mod write; diff --git a/src/daft-local-execution/src/sinks/pivot.rs b/src/daft-local-execution/src/sinks/pivot.rs new file mode 100644 index 0000000000..93cb7b3843 --- /dev/null +++ b/src/daft-local-execution/src/sinks/pivot.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_dsl::{AggExpr, Expr, ExprRef}; +use daft_micropartition::MicroPartition; +use tracing::instrument; + +use super::blocking_sink::{BlockingSink, BlockingSinkState, BlockingSinkStatus}; +use crate::{pipeline::PipelineResultType, NUM_CPUS}; + +enum PivotState { + Accumulating(Vec>), + Done, +} + +impl PivotState { + fn push(&mut self, part: Arc) { + if let Self::Accumulating(ref mut parts) = self { + parts.push(part); + } else { + panic!("PivotSink should be in Accumulating state"); + } + } + + fn finalize(&mut self) -> Vec> { + let res = if let Self::Accumulating(ref mut parts) = self { + std::mem::take(parts) + } else { + panic!("PivotSink should be in Accumulating state"); + }; + *self = Self::Done; + res + } +} + +impl BlockingSinkState for PivotState { + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} + +pub struct PivotSink { + pub group_by: Vec, + pub pivot_column: ExprRef, + pub value_column: ExprRef, + pub aggregation: AggExpr, + pub names: Vec, +} + +impl PivotSink { + pub fn new( + group_by: Vec, + pivot_column: ExprRef, + value_column: ExprRef, + aggregation: AggExpr, + names: Vec, + ) -> Self { + Self { + group_by, + pivot_column, + value_column, + aggregation, + names, + } + } +} + +impl BlockingSink for PivotSink { + #[instrument(skip_all, name = "PivotSink::sink")] + fn sink( + &self, + input: &Arc, + mut state: Box, + ) -> DaftResult { + state + .as_any_mut() + .downcast_mut::() + .expect("PivotSink should have PivotState") + .push(input.clone()); + Ok(BlockingSinkStatus::NeedMoreInput(state)) + } + + #[instrument(skip_all, name = "PivotSink::finalize")] + fn finalize( + &self, + states: Vec>, + ) -> DaftResult> { + let all_parts = states.into_iter().flat_map(|mut state| { + state + .as_any_mut() + .downcast_mut::() + .expect("PivotSink should have PivotState") + .finalize() + }); + let concated = MicroPartition::concat(all_parts)?; + let group_by_with_pivot = self + .group_by + .iter() + .chain(std::iter::once(&self.pivot_column)) + .cloned() + .collect::>(); + let agged = concated.agg( + &[Expr::Agg(self.aggregation.clone()).into()], + &group_by_with_pivot, + )?; + let pivoted = Arc::new(agged.pivot( + &self.group_by, + self.pivot_column.clone(), + self.value_column.clone(), + self.names.clone(), + )?); + Ok(Some(pivoted.into())) + } + + fn name(&self) -> &'static str { + "PivotSink" + } + + fn max_concurrency(&self) -> usize { + *NUM_CPUS + } + + fn make_state(&self) -> DaftResult> { + Ok(Box::new(PivotState::Accumulating(vec![]))) + } +} diff --git a/src/daft-physical-plan/src/local_plan.rs b/src/daft-physical-plan/src/local_plan.rs index 720b53080e..39eeca96c5 100644 --- a/src/daft-physical-plan/src/local_plan.rs +++ b/src/daft-physical-plan/src/local_plan.rs @@ -205,6 +205,7 @@ impl LocalPhysicalPlan { group_by: Vec, pivot_column: ExprRef, value_column: ExprRef, + aggregation: AggExpr, names: Vec, schema: SchemaRef, ) -> LocalPhysicalPlanRef { @@ -213,6 +214,7 @@ impl LocalPhysicalPlan { group_by, pivot_column, value_column, + aggregation, names, schema, plan_stats: PlanStats {}, @@ -438,6 +440,7 @@ pub struct Pivot { pub group_by: Vec, pub pivot_column: ExprRef, pub value_column: ExprRef, + pub aggregation: AggExpr, pub names: Vec, pub schema: SchemaRef, pub plan_stats: PlanStats, diff --git a/src/daft-physical-plan/src/translate.rs b/src/daft-physical-plan/src/translate.rs index 3e5ee5cf13..bba77357d7 100644 --- a/src/daft-physical-plan/src/translate.rs +++ b/src/daft-physical-plan/src/translate.rs @@ -1,5 +1,5 @@ use common_error::{DaftError, DaftResult}; -use daft_core::{join::JoinStrategy, prelude::Schema}; +use daft_core::join::JoinStrategy; use daft_dsl::ExprRef; use daft_plan::{JoinType, LogicalPlan, LogicalPlanRef, SourceInfo}; @@ -91,29 +91,12 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult { } LogicalPlan::Pivot(pivot) => { let input = translate(&pivot.input)?; - let groupby_with_pivot = pivot - .group_by - .iter() - .chain(std::iter::once(&pivot.pivot_column)) - .cloned() - .collect::>(); - let aggregate_fields = groupby_with_pivot - .iter() - .map(|expr| expr.to_field(input.schema())) - .chain(std::iter::once(pivot.aggregation.to_field(input.schema()))) - .collect::>>()?; - let aggregate_schema = Schema::new(aggregate_fields)?; - let aggregate = LocalPhysicalPlan::hash_aggregate( - input, - vec![pivot.aggregation.clone(); 1], - groupby_with_pivot, - aggregate_schema.into(), - ); Ok(LocalPhysicalPlan::pivot( - aggregate, + input, pivot.group_by.clone(), pivot.pivot_column.clone(), pivot.value_column.clone(), + pivot.aggregation.clone(), pivot.names.clone(), pivot.output_schema.clone(), )) diff --git a/tests/conftest.py b/tests/conftest.py index d46f62f89e..f91dc42797 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,7 @@ import pytest import daft +import daft.context from daft.table import MicroPartition # import all conftest @@ -170,3 +171,13 @@ def assert_df_equals( except AssertionError: print(f"Failed assertion for col: {col}") raise + + +@pytest.fixture( + scope="function", + params=[1, None] if daft.context.get_context().daft_execution_config.enable_native_executor else [None], +) +def with_morsel_size(request): + morsel_size = request.param + with daft.context.execution_config_ctx(default_morsel_size=morsel_size): + yield morsel_size diff --git a/tests/cookbook/conftest.py b/tests/cookbook/conftest.py index c04f7ab208..f62b74203e 100644 --- a/tests/cookbook/conftest.py +++ b/tests/cookbook/conftest.py @@ -42,7 +42,10 @@ def service_requests_csv_pd_df(): return pd.read_csv(COOKBOOK_DATA_CSV, keep_default_na=False)[COLUMNS] -@pytest.fixture(scope="module", params=[1, 2]) +@pytest.fixture( + scope="module", + params=[1, 2] if daft.context.get_context().daft_execution_config.enable_native_executor is False else [1], +) def repartition_nparts(request): """Adds a `n_repartitions` parameter to test cases which provides the number of partitions that the test case should repartition its dataset into for testing diff --git a/tests/cookbook/test_aggregations.py b/tests/cookbook/test_aggregations.py index 9eb2783da6..4dcfbf0cc6 100644 --- a/tests/cookbook/test_aggregations.py +++ b/tests/cookbook/test_aggregations.py @@ -12,7 +12,7 @@ from tests.conftest import assert_df_equals -def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Sums across an entire column for the entire table""" daft_df = daft_df.repartition(repartition_nparts).sum(col("Unique Key").alias("unique_key_sum")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -22,7 +22,7 @@ def test_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_sum") -def test_approx_percentiles(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_approx_percentiles(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Computes approx percentile across an entire column for the entire table""" daft_df = daft_df.repartition(repartition_nparts).agg( col("Unique Key").alias("unique_key_median").approx_percentiles([0.25, 0.5, 0.75]) @@ -37,7 +37,7 @@ def test_approx_percentiles(daft_df, service_requests_csv_pd_df, repartition_npa ) -def test_mean(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_mean(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Averages across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).mean(col("Unique Key").alias("unique_key_mean")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -47,7 +47,7 @@ def test_mean(daft_df, service_requests_csv_pd_df, repartition_nparts): assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_mean") -def test_min(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_min(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """min across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).min(col("Unique Key").alias("unique_key_min")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -57,7 +57,7 @@ def test_min(daft_df, service_requests_csv_pd_df, repartition_nparts): assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_min") -def test_max(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_max(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """max across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).max(col("Unique Key").alias("unique_key_max")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -67,7 +67,7 @@ def test_max(daft_df, service_requests_csv_pd_df, repartition_nparts): assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_max") -def test_count(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_count(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """count a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).count(col("Unique Key").alias("unique_key_count")) service_requests_csv_pd_df = pd.DataFrame.from_records( @@ -78,7 +78,7 @@ def test_count(daft_df, service_requests_csv_pd_df, repartition_nparts): assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_count") -def test_list(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_list(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """list agg a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).agg_list(col("Unique Key").alias("unique_key_list")).collect() unique_key_list = service_requests_csv_pd_df["Unique Key"].to_list() @@ -88,7 +88,7 @@ def test_list(daft_df, service_requests_csv_pd_df, repartition_nparts): assert set(result_list[0]) == set(unique_key_list) -def test_global_agg(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_global_agg(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Averages across a column for entire table""" daft_df = daft_df.repartition(repartition_nparts).agg( [ @@ -112,7 +112,7 @@ def test_global_agg(daft_df, service_requests_csv_pd_df, repartition_nparts): assert_df_equals(daft_pd_df, service_requests_csv_pd_df, sort_key="unique_key_mean") -def test_filtered_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_filtered_sum(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Sums across an entire column for the entire table filtered by a certain condition""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -139,7 +139,7 @@ def test_filtered_sum(daft_df, service_requests_csv_pd_df, repartition_nparts): pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_sum_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_sum_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """Sums across groups""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).sum(col("Unique Key")) service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).sum("Unique Key").reset_index() @@ -154,7 +154,7 @@ def test_sum_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, ke pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_approx_percentile_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_approx_percentile_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """Computes approx percentile across groups""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -180,7 +180,7 @@ def test_approx_percentile_groupby(daft_df, service_requests_csv_pd_df, repartit pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_mean_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_mean_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """Sums across groups""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).mean(col("Unique Key")) service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).mean("Unique Key").reset_index() @@ -195,7 +195,7 @@ def test_mean_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, k pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_count_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_count_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """count across groups""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).count() service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).count().reset_index() @@ -213,7 +213,7 @@ def test_count_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_min_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_min_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """min across groups""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -234,7 +234,7 @@ def test_min_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, ke pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_max_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_max_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """max across groups""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -255,7 +255,7 @@ def test_max_groupby(daft_df, service_requests_csv_pd_df, repartition_nparts, ke pytest.param(["Borough", "Complaint Type"], id="NumGroupSortKeys:2"), ], ) -def test_sum_groupby_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_sum_groupby_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """Test sorting after a groupby""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -277,7 +277,7 @@ def test_sum_groupby_sorted(daft_df, service_requests_csv_pd_df, repartition_npa pytest.param(["Borough", "Complaint Type"], id="NumGroupSortKeys:2"), ], ) -def test_map_groups(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_map_groups(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """Test map_groups""" @udf(return_dtype=DataType.float64()) diff --git a/tests/cookbook/test_computations.py b/tests/cookbook/test_computations.py index e863c65d85..6a676b5c9d 100644 --- a/tests/cookbook/test_computations.py +++ b/tests/cookbook/test_computations.py @@ -4,7 +4,7 @@ from tests.conftest import assert_df_equals -def test_add_one_to_column(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_add_one_to_column(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Creating a new column that is derived from (1 + other_column) and retrieving the top N results""" daft_df = daft_df.repartition(repartition_nparts).with_column("unique_key_mod", col("Unique Key") + 1) service_requests_csv_pd_df["unique_key_mod"] = service_requests_csv_pd_df["Unique Key"] + 1 @@ -12,7 +12,7 @@ def test_add_one_to_column(daft_df, service_requests_csv_pd_df, repartition_npar assert_df_equals(daft_pd_df, service_requests_csv_pd_df) -def test_add_one_to_column_name_override(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_add_one_to_column_name_override(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Creating a new column that is derived from (1 + other_column) and retrieving the top N results""" daft_df = ( daft_df.repartition(repartition_nparts) @@ -24,7 +24,7 @@ def test_add_one_to_column_name_override(daft_df, service_requests_csv_pd_df, re assert_df_equals(daft_pd_df, service_requests_csv_pd_df) -def test_add_one_to_column_limit(daft_df, service_requests_csv_pd_df): +def test_add_one_to_column_limit(daft_df, service_requests_csv_pd_df, with_morsel_size): """Creating a new column that is derived from (1 + other_column) and retrieving the top N results""" daft_df = daft_df.with_column("unique_key_mod", col("Unique Key") + 1).limit(10) service_requests_csv_pd_df["unique_key_mod"] = service_requests_csv_pd_df["Unique Key"] + 1 @@ -33,7 +33,7 @@ def test_add_one_to_column_limit(daft_df, service_requests_csv_pd_df): assert_df_equals(daft_pd_df, service_requests_csv_pd_df) -def test_add_one_twice_to_column(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_add_one_twice_to_column(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Creating a new column that is derived from (1 + other_column) and retrieving the top N results""" daft_df = daft_df.repartition(repartition_nparts).with_column("unique_key_mod", col("Unique Key") + 1) daft_df = daft_df.with_column("unique_key_mod_second", col("unique_key_mod") + 1) @@ -43,7 +43,7 @@ def test_add_one_twice_to_column(daft_df, service_requests_csv_pd_df, repartitio assert_df_equals(daft_pd_df, service_requests_csv_pd_df) -def test_difference_cols(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_difference_cols(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Creating a new column that is derived from 2 other columns and retrieving the top N results""" daft_df = daft_df.repartition(repartition_nparts).with_column( "unique_key_mod", col("Unique Key") - col("Unique Key") diff --git a/tests/cookbook/test_count_rows.py b/tests/cookbook/test_count_rows.py index da58d4af64..2b0b36d257 100644 --- a/tests/cookbook/test_count_rows.py +++ b/tests/cookbook/test_count_rows.py @@ -5,7 +5,7 @@ from daft.expressions import col -def test_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Count rows for the entire table""" daft_df_row_count = daft_df.repartition(repartition_nparts).count_rows() assert daft_df_row_count == service_requests_csv_pd_df.shape[0] @@ -19,7 +19,7 @@ def test_dataframe_count_no_args(daft_df, service_requests_csv_pd_df): assert results["count"][0] == service_requests_csv_pd_df.shape[0] -def test_filtered_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_filtered_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Count rows on a table filtered by a certain condition""" daft_df_row_count = daft_df.repartition(repartition_nparts).where(col("Borough") == "BROOKLYN").count_rows() @@ -34,14 +34,14 @@ def test_filtered_count_rows(daft_df, service_requests_csv_pd_df, repartition_np pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_groupby_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_groupby_count_rows(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """Count rows after group by""" daft_df = daft_df.repartition(repartition_nparts).groupby(*[col(k) for k in keys]).sum(col("Unique Key")) service_requests_csv_pd_df = service_requests_csv_pd_df.groupby(keys).sum("Unique Key").reset_index() assert daft_df.count_rows() == len(service_requests_csv_pd_df) -def test_dataframe_length_after_collect(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_dataframe_length_after_collect(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Count rows after group by""" daft_df = daft_df.repartition(repartition_nparts).collect() assert len(daft_df) == len(service_requests_csv_pd_df) diff --git a/tests/cookbook/test_dataloading.py b/tests/cookbook/test_dataloading.py index 6f1a9587d4..d75497b2fe 100644 --- a/tests/cookbook/test_dataloading.py +++ b/tests/cookbook/test_dataloading.py @@ -10,7 +10,7 @@ from tests.cookbook.assets import COOKBOOK_DATA_CSV -def test_load(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_load(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Loading data from a CSV or Parquet works""" pd_slice = service_requests_csv_pd_df daft_slice = daft_df.repartition(repartition_nparts) diff --git a/tests/cookbook/test_distinct.py b/tests/cookbook/test_distinct.py index a209ad80f3..814c295b47 100644 --- a/tests/cookbook/test_distinct.py +++ b/tests/cookbook/test_distinct.py @@ -13,7 +13,7 @@ pytest.param(["Borough", "Complaint Type"], id="NumGroupByKeys:2"), ], ) -def test_distinct_all_columns(daft_df, service_requests_csv_pd_df, repartition_nparts, keys): +def test_distinct_all_columns(daft_df, service_requests_csv_pd_df, repartition_nparts, keys, with_morsel_size): """Sums across groups""" daft_df = daft_df.repartition(repartition_nparts).select(*[col(k) for k in keys]).distinct() diff --git a/tests/cookbook/test_filter.py b/tests/cookbook/test_filter.py index 2009c0f73d..26320bc958 100644 --- a/tests/cookbook/test_filter.py +++ b/tests/cookbook/test_filter.py @@ -36,7 +36,7 @@ ), ], ) -def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Filter the dataframe, retrieve the top N results and select a subset of columns""" daft_noise_complaints = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -104,7 +104,7 @@ def test_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_np ), ], ) -def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Filter the dataframe with a complex filter and select a subset of columns""" daft_noise_complaints_brooklyn = daft_df_ops(daft_df.repartition(repartition_nparts)) @@ -163,7 +163,7 @@ def test_complex_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repart ), ], ) -def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_chain_filter(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Filter the dataframe with a chain of filters and select a subset of columns""" daft_noise_complaints_brooklyn = daft_df_ops(daft_df.repartition(repartition_nparts)) diff --git a/tests/cookbook/test_image.py b/tests/cookbook/test_image.py index 86cfc85551..615abeb0f4 100644 --- a/tests/cookbook/test_image.py +++ b/tests/cookbook/test_image.py @@ -12,7 +12,7 @@ from tests.cookbook.assets import ASSET_FOLDER -def test_image_resize_mixed_modes(): +def test_image_resize_mixed_modes(with_morsel_size) -> None: rgba = np.ones((2, 2, 4), dtype=np.uint8) rgba[..., 1] = 2 rgba[..., 2] = 3 @@ -64,7 +64,7 @@ def test_image_resize_mixed_modes(): assert as_py[-1] is None -def test_image_decode() -> None: +def test_image_decode(with_morsel_size) -> None: df = ( daft.from_glob_path(f"{ASSET_FOLDER}/images/**") .into_partitions(2) @@ -75,7 +75,7 @@ def test_image_decode() -> None: df.collect() -def test_image_encode() -> None: +def test_image_encode(with_morsel_size) -> None: file_format = "png" mode = "RGB" np_dtype = np.uint8 diff --git a/tests/cookbook/test_joins.py b/tests/cookbook/test_joins.py index d80dce72a2..856f33e653 100644 --- a/tests/cookbook/test_joins.py +++ b/tests/cookbook/test_joins.py @@ -18,7 +18,7 @@ def skip_invalid_join_strategies(join_strategy): [None, "hash", "sort_merge", "sort_merge_aligned_boundaries", "broadcast"], indirect=True, ) -def test_simple_join(join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_simple_join(join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): skip_invalid_join_strategies(join_strategy) daft_df = daft_df.repartition(repartition_nparts) daft_df_left = daft_df.select(col("Unique Key"), col("Borough")) @@ -41,7 +41,7 @@ def test_simple_join(join_strategy, daft_df, service_requests_csv_pd_df, reparti [None, "hash", "sort_merge", "sort_merge_aligned_boundaries", "broadcast"], indirect=True, ) -def test_simple_self_join(join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_simple_self_join(join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): skip_invalid_join_strategies(join_strategy) daft_df = daft_df.repartition(repartition_nparts) daft_df = daft_df.select(col("Unique Key"), col("Borough")) @@ -68,7 +68,9 @@ def test_simple_self_join(join_strategy, daft_df, service_requests_csv_pd_df, re [None, "hash", "sort_merge", "sort_merge_aligned_boundaries", "broadcast"], indirect=True, ) -def test_simple_join_missing_rvalues(join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_simple_join_missing_rvalues( + join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size +): skip_invalid_join_strategies(join_strategy) daft_df_right = daft_df.sort("Unique Key").limit(25).repartition(repartition_nparts) daft_df_left = daft_df.repartition(repartition_nparts) @@ -94,7 +96,9 @@ def test_simple_join_missing_rvalues(join_strategy, daft_df, service_requests_cs [None, "hash", "sort_merge", "sort_merge_aligned_boundaries", "broadcast"], indirect=True, ) -def test_simple_join_missing_lvalues(join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_simple_join_missing_lvalues( + join_strategy, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size +): skip_invalid_join_strategies(join_strategy) daft_df_right = daft_df.repartition(repartition_nparts) daft_df_left = daft_df.sort(col("Unique Key")).limit(25).repartition(repartition_nparts) diff --git a/tests/cookbook/test_pandas_cookbook.py b/tests/cookbook/test_pandas_cookbook.py index c852568f3f..2e967ec4b1 100644 --- a/tests/cookbook/test_pandas_cookbook.py +++ b/tests/cookbook/test_pandas_cookbook.py @@ -18,7 +18,7 @@ IF_THEN_DATA = {"AAA": [4, 5, 6, 7], "BBB": [10, 20, 30, 40], "CCC": [100, 50, -30, -50]} -def test_if_then(repartition_nparts): +def test_if_then(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(IF_THEN_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(IF_THEN_DATA) daft_df = daft_df.with_column("BBB", (col("AAA") >= 5).if_else(-1, col("BBB"))) @@ -27,7 +27,7 @@ def test_if_then(repartition_nparts): assert_df_equals(daft_pd_df, pd_df, sort_key="AAA") -def test_if_then_2_cols(repartition_nparts): +def test_if_then_2_cols(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(IF_THEN_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(IF_THEN_DATA) daft_df = daft_df.with_column("BBB", (col("AAA") >= 5).if_else(2000, col("BBB"))).with_column( @@ -39,7 +39,7 @@ def test_if_then_2_cols(repartition_nparts): assert_df_equals(daft_pd_df, pd_df, sort_key="AAA") -def test_if_then_numpy_where(repartition_nparts): +def test_if_then_numpy_where(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(IF_THEN_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(IF_THEN_DATA) daft_df = daft_df.with_column("logic", (col("AAA") > 5).if_else("high", lit("low"))) @@ -55,7 +55,7 @@ def test_if_then_numpy_where(repartition_nparts): SPLITTING_DATA = {"AAA": [4, 5, 6, 7], "BBB": [10, 20, 30, 40], "CCC": [100, 50, -30, -50]} -def test_split_frame_boolean_criterion(repartition_nparts): +def test_split_frame_boolean_criterion(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(SPLITTING_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(SPLITTING_DATA) daft_df = daft_df.where(col("AAA") <= 5) @@ -71,7 +71,7 @@ def test_split_frame_boolean_criterion(repartition_nparts): BUILDING_DATA = {"AAA": [4, 5, 6, 7], "BBB": [10, 20, 30, 40], "CCC": [100, 50, -30, -50]} -def test_multi_criteria_and(repartition_nparts): +def test_multi_criteria_and(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(BUILDING_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(BUILDING_DATA) daft_df = daft_df.where((col("BBB") < 25) & (col("CCC") >= -40)).select(col("AAA")) @@ -80,7 +80,7 @@ def test_multi_criteria_and(repartition_nparts): assert_df_equals(daft_pd_df, pd_df, sort_key="AAA") -def test_multi_criteria_or(repartition_nparts): +def test_multi_criteria_or(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(BUILDING_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(BUILDING_DATA) daft_df = daft_df.where((col("BBB") > 25) | (col("CCC") >= -40)).select(col("AAA")) @@ -89,7 +89,7 @@ def test_multi_criteria_or(repartition_nparts): assert_df_equals(daft_pd_df, pd_df, sort_key="AAA") -def test_multi_criteria_or_assignment(repartition_nparts): +def test_multi_criteria_or_assignment(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(BUILDING_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(BUILDING_DATA) daft_df = daft_df.with_column( @@ -100,7 +100,7 @@ def test_multi_criteria_or_assignment(repartition_nparts): assert_df_equals(daft_pd_df, pd_df, sort_key="BBB") -def test_select_rows_closest_to_certain_value_using_argsort(repartition_nparts): +def test_select_rows_closest_to_certain_value_using_argsort(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(BUILDING_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(BUILDING_DATA) aValue = 43.0 @@ -118,7 +118,7 @@ def test_select_rows_closest_to_certain_value_using_argsort(repartition_nparts): @pytest.mark.skip(reason="Requires F.row_number() and Expression.is_in(...)") -def test_splitting_by_row_index(repartition_nparts): +def test_splitting_by_row_index(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(SELECTION_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(SELECTION_DATA) daft_df = daft_df.where((col("AAA") <= 6) & F.row_number().is_in([0, 2, 4])) # noqa: F821 @@ -128,7 +128,7 @@ def test_splitting_by_row_index(repartition_nparts): @pytest.mark.skip(reason="Requires F.row_number()") -def test_splitting_by_row_range(repartition_nparts): +def test_splitting_by_row_range(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(SELECTION_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(SELECTION_DATA) daft_df = daft_df.where((F.row_number() >= 0) & (F.row_number() < 3)) # noqa: F821 @@ -145,7 +145,7 @@ def test_splitting_by_row_range(repartition_nparts): @pytest.mark.skip(reason="Requires Expression.applymap((val) => result)") -def test_efficiently_and_dynamically_creating_new_columns_using_applymap(repartition_nparts): +def test_efficiently_and_dynamically_creating_new_columns_using_applymap(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(APPLYMAP_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(APPLYMAP_DATA) source_cols = pd_df.columns @@ -165,7 +165,7 @@ def test_efficiently_and_dynamically_creating_new_columns_using_applymap(reparti @pytest.mark.skip(reason="Requires .first() aggregations") -def test_keep_other_columns_when_using_min_with_groupby(repartition_nparts): +def test_keep_other_columns_when_using_min_with_groupby(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(APPLYMAP_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(APPLYMAP_DATA) daft_df = daft_df.groupby(col("AAA")).min(col("BBB")) @@ -202,7 +202,7 @@ def test_keep_other_columns_when_using_min_with_groupby(repartition_nparts): # assert_df_equals(daft_df, pd_df, sort_key="animal") -def test_applying_to_different_items_in_group(repartition_nparts): +def test_applying_to_different_items_in_group(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(GROUPBY_DATA).repartition(repartition_nparts) pd_df = pd.DataFrame.from_dict(GROUPBY_DATA) daft_df = daft_df.with_column( @@ -238,7 +238,7 @@ def GrowUp(x): } -def test_self_join(repartition_nparts): +def test_self_join(repartition_nparts, with_morsel_size): daft_df = daft.from_pydict(JOIN_DATA).repartition(repartition_nparts) daft_df = daft_df.with_column("Test_1", col("Test_0") - 1) daft_df = daft_df.join( diff --git a/tests/cookbook/test_sorting.py b/tests/cookbook/test_sorting.py index 71a098e8d0..83a3e6928b 100644 --- a/tests/cookbook/test_sorting.py +++ b/tests/cookbook/test_sorting.py @@ -7,7 +7,7 @@ from tests.conftest import assert_df_equals -def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size): """Sort by a column that undergoes an expression""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort(((col("Unique Key") % 2) == 0).if_else(col("Unique Key"), col("Unique Key") * -1)) @@ -36,7 +36,7 @@ def test_sorted_by_expr(daft_df, service_requests_csv_pd_df, repartition_nparts) pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): +def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, with_morsel_size): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort([col(k) for k in sort_keys], desc=True) @@ -55,7 +55,7 @@ def test_get_sorted(daft_df, service_requests_csv_pd_df, repartition_nparts, sor pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): +def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, with_morsel_size): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) daft_sorted_df = daft_df.sort([col(k) for k in sort_keys], desc=True).limit(100) @@ -74,7 +74,9 @@ def test_get_sorted_top_n(daft_df, service_requests_csv_pd_df, repartition_npart pytest.param(["Borough", "Unique Key"], id="NumSortKeys:2"), ], ) -def test_get_sorted_top_n_flipped_desc(daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys): +def test_get_sorted_top_n_flipped_desc( + daft_df, service_requests_csv_pd_df, repartition_nparts, sort_keys, with_morsel_size +): """Sort by a column""" daft_df = daft_df.repartition(repartition_nparts) desc_list = [True] @@ -103,7 +105,9 @@ def test_get_sorted_top_n_flipped_desc(daft_df, service_requests_csv_pd_df, repa ), ], ) -def test_get_sorted_top_n_projected(daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts): +def test_get_sorted_top_n_projected( + daft_df_ops, daft_df, service_requests_csv_pd_df, repartition_nparts, with_morsel_size +): """Sort by a column and retrieve specific columns from the top N results""" daft_df = daft_df.repartition(repartition_nparts) expected = service_requests_csv_pd_df.sort_values(by="Unique Key", ascending=False)[ diff --git a/tests/cookbook/test_write.py b/tests/cookbook/test_write.py index c00e00f1ac..093439bdfb 100644 --- a/tests/cookbook/test_write.py +++ b/tests/cookbook/test_write.py @@ -13,7 +13,7 @@ PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0) -def test_parquet_write(tmp_path): +def test_parquet_write(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_parquet(tmp_path) @@ -26,7 +26,7 @@ def test_parquet_write(tmp_path): assert len(pd_df._preview.preview_partition) == 1 -def test_parquet_write_with_partitioning(tmp_path): +def test_parquet_write_with_partitioning(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_parquet(tmp_path, partition_cols=["Borough"]) @@ -40,7 +40,7 @@ def test_parquet_write_with_partitioning(tmp_path): assert len(pd_df._preview.preview_partition) == 5 -def test_empty_parquet_write_without_partitioning(tmp_path): +def test_empty_parquet_write_without_partitioning(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) df = df.where(daft.lit(False)) @@ -52,7 +52,7 @@ def test_empty_parquet_write_without_partitioning(tmp_path): assert len(pd_df._preview.preview_partition) == 1 -def test_empty_parquet_write_with_partitioning(tmp_path): +def test_empty_parquet_write_with_partitioning(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) df = df.where(daft.lit(False)) @@ -64,7 +64,7 @@ def test_empty_parquet_write_with_partitioning(tmp_path): assert len(output_files._preview.preview_partition) == 1 -def test_parquet_write_with_partitioning_readback_values(tmp_path): +def test_parquet_write_with_partitioning_readback_values(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) output_files = df.write_parquet(tmp_path, partition_cols=["Borough"]) @@ -99,7 +99,7 @@ def test_parquet_write_with_partitioning_readback_values(tmp_path): (daft.col("date").partitioning.years(), "date_years", [54]), ], ) -def test_parquet_write_with_iceberg_date_partitioning(exp, key, answer, tmp_path): +def test_parquet_write_with_iceberg_date_partitioning(exp, key, answer, tmp_path, with_morsel_size): data = { "id": [1, 2, 3, 4, 5], "date": [ @@ -126,7 +126,7 @@ def test_parquet_write_with_iceberg_date_partitioning(exp, key, answer, tmp_path (daft.col("id").partitioning.iceberg_truncate(10), "id_trunc", [0, 10, 20, 40]), ], ) -def test_parquet_write_with_iceberg_bucket_and_trunc(exp, key, answer, tmp_path): +def test_parquet_write_with_iceberg_bucket_and_trunc(exp, key, answer, tmp_path, with_morsel_size): data = { "id": [1, 12, 23, 24, 45], "date": [ @@ -150,7 +150,7 @@ def test_parquet_write_with_iceberg_bucket_and_trunc(exp, key, answer, tmp_path) not PYARROW_GE_7_0_0, reason="We only use pyarrow datasets 7 for this test", ) -def test_parquet_write_with_null_values(tmp_path): +def test_parquet_write_with_null_values(tmp_path, with_morsel_size): df = daft.from_pydict({"x": [1, 2, 3, None]}) df.write_parquet(tmp_path, partition_cols=[df["x"].alias("y")]) ds = pads.dataset(tmp_path, format="parquet", partitioning=pads.HivePartitioning(pa.schema([("y", pa.int64())]))) @@ -168,7 +168,7 @@ def smaller_parquet_target_filesize(): not PYARROW_GE_7_0_0, reason="We only use pyarrow datasets 7 for this test", ) -def test_parquet_write_multifile(tmp_path, smaller_parquet_target_filesize): +def test_parquet_write_multifile(tmp_path, smaller_parquet_target_filesize, with_morsel_size): data = {"x": list(range(1_000))} df = daft.from_pydict(data) df2 = df.write_parquet(tmp_path) @@ -181,7 +181,7 @@ def test_parquet_write_multifile(tmp_path, smaller_parquet_target_filesize): not PYARROW_GE_7_0_0, reason="We only use pyarrow datasets 7 for this test", ) -def test_parquet_write_multifile_with_partitioning(tmp_path, smaller_parquet_target_filesize): +def test_parquet_write_multifile_with_partitioning(tmp_path, smaller_parquet_target_filesize, with_morsel_size): data = {"x": list(range(1_000))} df = daft.from_pydict(data) df2 = df.write_parquet(tmp_path, partition_cols=[df["x"].alias("y") % 2]) @@ -193,7 +193,7 @@ def test_parquet_write_multifile_with_partitioning(tmp_path, smaller_parquet_tar assert readback["y"] == [y % 2 for y in data["x"]] -def test_parquet_write_with_some_empty_partitions(tmp_path): +def test_parquet_write_with_some_empty_partitions(tmp_path, with_morsel_size): data = {"x": [1, 2, 3], "y": ["a", "b", "c"]} daft.from_pydict(data).into_partitions(4).write_parquet(tmp_path) @@ -201,7 +201,7 @@ def test_parquet_write_with_some_empty_partitions(tmp_path): assert read_back == data -def test_parquet_partitioned_write_with_some_empty_partitions(tmp_path): +def test_parquet_partitioned_write_with_some_empty_partitions(tmp_path, with_morsel_size): data = {"x": [1, 2, 3], "y": ["a", "b", "c"]} output_files = daft.from_pydict(data).into_partitions(4).write_parquet(tmp_path, partition_cols=["x"]) @@ -211,7 +211,7 @@ def test_parquet_partitioned_write_with_some_empty_partitions(tmp_path): assert read_back == data -def test_csv_write(tmp_path): +def test_csv_write(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) pd_df = df.write_csv(tmp_path) @@ -225,7 +225,7 @@ def test_csv_write(tmp_path): assert len(pd_df._preview.preview_partition) == 1 -def test_csv_write_with_partitioning(tmp_path): +def test_csv_write_with_partitioning(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) schema = df.schema() names = schema.column_names() @@ -240,7 +240,7 @@ def test_csv_write_with_partitioning(tmp_path): assert len(pd_df) == 5 -def test_empty_csv_write(tmp_path): +def test_empty_csv_write(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) df = df.where(daft.lit(False)) @@ -258,7 +258,7 @@ def test_empty_csv_write(tmp_path): assert len(pd_df._preview.preview_partition) == 1 -def test_empty_csv_write_with_partitioning(tmp_path): +def test_empty_csv_write_with_partitioning(tmp_path, with_morsel_size): df = daft.read_csv(COOKBOOK_DATA_CSV) df = df.where(daft.lit(False)) @@ -276,7 +276,7 @@ def test_empty_csv_write_with_partitioning(tmp_path): assert len(pd_df._preview.preview_partition) == 1 -def test_csv_write_with_some_empty_partitions(tmp_path): +def test_csv_write_with_some_empty_partitions(tmp_path, with_morsel_size): data = {"x": [1, 2, 3], "y": ["a", "b", "c"]} daft.from_pydict(data).into_partitions(4).write_csv(tmp_path) @@ -284,7 +284,7 @@ def test_csv_write_with_some_empty_partitions(tmp_path): assert read_back == data -def test_csv_partitioned_write_with_some_empty_partitions(tmp_path): +def test_csv_partitioned_write_with_some_empty_partitions(tmp_path, with_morsel_size): data = {"x": [1, 2, 3], "y": ["a", "b", "c"]} output_files = daft.from_pydict(data).into_partitions(4).write_csv(tmp_path, partition_cols=["x"]) diff --git a/tests/dataframe/test_aggregations.py b/tests/dataframe/test_aggregations.py index 74fe889ce0..f942410d77 100644 --- a/tests/dataframe/test_aggregations.py +++ b/tests/dataframe/test_aggregations.py @@ -16,7 +16,7 @@ @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_agg_global(make_df, repartition_nparts): +def test_agg_global(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [1, 2, 3], @@ -48,7 +48,7 @@ def test_agg_global(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_agg_global_all_null(make_df, repartition_nparts): +def test_agg_global_all_null(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [0, 1, 2, 3], @@ -115,7 +115,7 @@ def test_agg_global_empty(make_df): @pytest.mark.parametrize("repartition_nparts", [1, 2, 7]) -def test_agg_groupby(make_df, repartition_nparts): +def test_agg_groupby(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 1, 2, 2, 2], @@ -157,7 +157,7 @@ def test_agg_groupby(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_agg_groupby_all_null(make_df, repartition_nparts): +def test_agg_groupby_all_null(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [0, 1, 2, 3, 4], @@ -212,7 +212,7 @@ def test_agg_groupby_null_type_column(make_df): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_null_groupby_keys(make_df, repartition_nparts): +def test_null_groupby_keys(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [0, 1, 2, 3, 4], @@ -235,7 +235,7 @@ def test_null_groupby_keys(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_all_null_groupby_keys(make_df, repartition_nparts): +def test_all_null_groupby_keys(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [0, 1, 2], @@ -315,7 +315,7 @@ def test_agg_groupby_empty(make_df): @pytest.mark.parametrize("repartition_nparts", [1, 2, 7]) -def test_agg_groupby_with_alias(make_df, repartition_nparts): +def test_agg_groupby_with_alias(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 1, 2, 2, 2], @@ -360,6 +360,9 @@ def test_agg_groupby_with_alias(make_df, repartition_nparts): class CustomObject: val: int + def __hash__(self): + return hash(self.val) + def test_agg_pyobjects(): objects = [CustomObject(val=0), None, CustomObject(val=1)] @@ -375,7 +378,7 @@ def test_agg_pyobjects(): res = df.to_pydict() assert res["count"] == [2] - assert res["list"] == [objects] + assert set(res["list"][0]) == set(objects) def test_groupby_agg_pyobjects(): @@ -397,11 +400,12 @@ def test_groupby_agg_pyobjects(): res = df.to_pydict() assert res["groups"] == [1, 2] assert res["count"] == [2, 1] - assert res["list"] == [[objects[0], objects[2], objects[4]], [objects[1], objects[3]]] + assert set(res["list"][0]) == set([objects[0], objects[2], objects[4]]) + assert set(res["list"][1]) == set([objects[1], objects[3]]) @pytest.mark.parametrize("shuffle_aggregation_default_partitions", [None, 20]) -def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions): +def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_default_partitions, with_morsel_size): if shuffle_aggregation_default_partitions is None: min_partitions = get_context().daft_execution_config.shuffle_aggregation_default_partitions else: @@ -428,7 +432,7 @@ def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_defaul @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_agg_any_value(make_df, repartition_nparts): +def test_agg_any_value(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 1, 2, 2, 2], @@ -446,7 +450,7 @@ def test_agg_any_value(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_agg_any_value_ignore_nulls(make_df, repartition_nparts): +def test_agg_any_value_ignore_nulls(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 1, 2, 2, 2, 3, 3, 3], diff --git a/tests/dataframe/test_approx_count_distinct.py b/tests/dataframe/test_approx_count_distinct.py index 68d7057ca0..ac664009fb 100644 --- a/tests/dataframe/test_approx_count_distinct.py +++ b/tests/dataframe/test_approx_count_distinct.py @@ -48,7 +48,7 @@ def assert_equal(df: daft.DataFrame, expected): # We want to test cases in which the "singular partition" optimization is hit and cases where it's not. @pytest.mark.parametrize("data_and_expected", TESTS) @pytest.mark.parametrize("partition_size", [None, 2, 3]) -def test_approx_count_distinct(data_and_expected, partition_size): +def test_approx_count_distinct(data_and_expected, partition_size, with_morsel_size): data, expected = data_and_expected df = make_df(data) if partition_size: @@ -61,7 +61,7 @@ def test_approx_count_distinct(data_and_expected, partition_size): # Test creating data with empty partitions against the HLL algo. # We want to test if empty partitions mess with the final global result. @pytest.mark.parametrize("data_and_expected", TESTS) -def test_approx_count_distinct_on_dfs_with_empty_partitions(data_and_expected): +def test_approx_count_distinct_on_dfs_with_empty_partitions(data_and_expected, with_morsel_size): data, expected = data_and_expected df = make_asymmetric_df(data) df = df.agg(col("a").approx_count_distinct()) @@ -75,7 +75,7 @@ def test_approx_count_distinct_on_dfs_with_empty_partitions(data_and_expected): # We should always test `NULL` values as the "absence" of values. # Therefore, the existence of a `NULL` should never affect the approx_count_distinct value that is returned (even if it's in a column of type `NULL`). @pytest.mark.parametrize("data_and_expected", TESTS) -def test_approx_count_distinct_on_null_values(data_and_expected): +def test_approx_count_distinct_on_null_values(data_and_expected, with_morsel_size): data, expected = data_and_expected data = data + [None] * 10 df = make_df(data) diff --git a/tests/dataframe/test_concat.py b/tests/dataframe/test_concat.py index f3caf56bb1..3e18e1e80b 100644 --- a/tests/dataframe/test_concat.py +++ b/tests/dataframe/test_concat.py @@ -3,20 +3,20 @@ import pytest -def test_simple_concat(make_df): +def test_simple_concat(make_df, with_morsel_size): df1 = make_df({"foo": [1, 2, 3]}) df2 = make_df({"foo": [4, 5, 6]}) result = df1.concat(df2) assert result.to_pydict() == {"foo": [1, 2, 3, 4, 5, 6]} -def test_concat_schema_mismatch(make_df): +def test_concat_schema_mismatch(make_df, with_morsel_size): df1 = make_df({"foo": [1, 2, 3]}) df2 = make_df({"foo": ["4", "5", "6"]}) with pytest.raises(ValueError): df1.concat(df2) -def test_self_concat(make_df): +def test_self_concat(make_df, with_morsel_size): df = make_df({"foo": [1, 2, 3]}) assert df.concat(df).to_pydict() == {"foo": [1, 2, 3, 1, 2, 3]} diff --git a/tests/dataframe/test_distinct.py b/tests/dataframe/test_distinct.py index 8e4b2c0a85..d09e0cefcd 100644 --- a/tests/dataframe/test_distinct.py +++ b/tests/dataframe/test_distinct.py @@ -8,7 +8,7 @@ @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_distinct_with_nulls(make_df, repartition_nparts): +def test_distinct_with_nulls(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [1, None, None, None], @@ -28,7 +28,7 @@ def test_distinct_with_nulls(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_distinct_with_all_nulls(make_df, repartition_nparts): +def test_distinct_with_all_nulls(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [None, None, None, None], @@ -48,7 +48,7 @@ def test_distinct_with_all_nulls(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2]) -def test_distinct_with_empty(make_df, repartition_nparts): +def test_distinct_with_empty(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [1], diff --git a/tests/dataframe/test_iter.py b/tests/dataframe/test_iter.py index 8658e5da30..7d4ede270d 100644 --- a/tests/dataframe/test_iter.py +++ b/tests/dataframe/test_iter.py @@ -3,12 +3,6 @@ import pytest import daft -from daft import context - -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) class MockException(Exception): @@ -33,25 +27,26 @@ def test_iter_partitions(make_df, materialized): # Test that df.iter_partitions() produces partitions in the correct order. # It should work regardless of whether the dataframe has already been materialized or not. - df = make_df({"a": list(range(10))}).into_partitions(5).with_column("b", daft.col("a") + 100) + with daft.execution_config_ctx(default_morsel_size=2): + df = make_df({"a": list(range(10))}).into_partitions(5).with_column("b", daft.col("a") + 100) - if materialized: - df = df.collect() + if materialized: + df = df.collect() - parts = list(df.iter_partitions()) - if daft.context.get_context().runner_config.name == "ray": - import ray + parts = list(df.iter_partitions()) + if daft.context.get_context().runner_config.name == "ray": + import ray - parts = ray.get(parts) - parts = [_.to_pydict() for _ in parts] + parts = ray.get(parts) + parts = [_.to_pydict() for _ in parts] - assert parts == [ - {"a": [0, 1], "b": [100, 101]}, - {"a": [2, 3], "b": [102, 103]}, - {"a": [4, 5], "b": [104, 105]}, - {"a": [6, 7], "b": [106, 107]}, - {"a": [8, 9], "b": [108, 109]}, - ] + assert parts == [ + {"a": [0, 1], "b": [100, 101]}, + {"a": [2, 3], "b": [102, 103]}, + {"a": [4, 5], "b": [104, 105]}, + {"a": [6, 7], "b": [106, 107]}, + {"a": [8, 9], "b": [108, 109]}, + ] def test_iter_exception(make_df): @@ -66,20 +61,21 @@ def echo_or_trigger(s): else: return s - df = make_df({"a": list(range(200))}).into_partitions(100).with_column("b", echo_or_trigger(daft.col("a"))) + with daft.execution_config_ctx(default_morsel_size=2): + df = make_df({"a": list(range(200))}).into_partitions(100).with_column("b", echo_or_trigger(daft.col("a"))) - it = iter(df) - assert next(it) == {"a": 0, "b": 0} + it = iter(df) + assert next(it) == {"a": 0, "b": 0} - # Ensure the exception does trigger if execution continues. - with pytest.raises(RuntimeError) as exc_info: - list(it) + # Ensure the exception does trigger if execution continues. + with pytest.raises(RuntimeError) as exc_info: + list(it) - # Ray's wrapping of the exception loses information about the `.cause`, but preserves it in the string error message - if daft.context.get_context().runner_config.name == "ray": - assert "MockException" in str(exc_info.value) - else: - assert isinstance(exc_info.value.__cause__, MockException) + # Ray's wrapping of the exception loses information about the `.cause`, but preserves it in the string error message + if daft.context.get_context().runner_config.name == "ray": + assert "MockException" in str(exc_info.value) + else: + assert isinstance(exc_info.value.__cause__, MockException) def test_iter_partitions_exception(make_df): @@ -94,26 +90,27 @@ def echo_or_trigger(s): else: return s - df = make_df({"a": list(range(200))}).into_partitions(100).with_column("b", echo_or_trigger(daft.col("a"))) + with daft.execution_config_ctx(default_morsel_size=2): + df = make_df({"a": list(range(200))}).into_partitions(100).with_column("b", echo_or_trigger(daft.col("a"))) - it = df.iter_partitions() - part = next(it) - if daft.context.get_context().runner_config.name == "ray": - import ray + it = df.iter_partitions() + part = next(it) + if daft.context.get_context().runner_config.name == "ray": + import ray - part = ray.get(part) - part = part.to_pydict() + part = ray.get(part) + part = part.to_pydict() - assert part == {"a": [0, 1], "b": [0, 1]} + assert part == {"a": [0, 1], "b": [0, 1]} - # Ensure the exception does trigger if execution continues. - with pytest.raises(RuntimeError) as exc_info: - res = list(it) - if daft.context.get_context().runner_config.name == "ray": - ray.get(res) + # Ensure the exception does trigger if execution continues. + with pytest.raises(RuntimeError) as exc_info: + res = list(it) + if daft.context.get_context().runner_config.name == "ray": + ray.get(res) - # Ray's wrapping of the exception loses information about the `.cause`, but preserves it in the string error message - if daft.context.get_context().runner_config.name == "ray": - assert "MockException" in str(exc_info.value) - else: - assert isinstance(exc_info.value.__cause__, MockException) + # Ray's wrapping of the exception loses information about the `.cause`, but preserves it in the string error message + if daft.context.get_context().runner_config.name == "ray": + assert "MockException" in str(exc_info.value) + else: + assert isinstance(exc_info.value.__cause__, MockException) diff --git a/tests/dataframe/test_joins.py b/tests/dataframe/test_joins.py index 4b08abea61..ceac56283f 100644 --- a/tests/dataframe/test_joins.py +++ b/tests/dataframe/test_joins.py @@ -72,7 +72,7 @@ def test_rename_join_keys_in_dataframe(make_df): indirect=True, ) @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_joins(join_strategy, join_type, make_df, n_partitions: int): +def test_joins(join_strategy, join_type, make_df, n_partitions: int, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) df = make_df( @@ -104,7 +104,7 @@ def test_joins(join_strategy, join_type, make_df, n_partitions: int): indirect=True, ) @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_multicol_joins(join_strategy, join_type, make_df, n_partitions: int): +def test_multicol_joins(join_strategy, join_type, make_df, n_partitions: int, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) df = make_df( @@ -138,7 +138,7 @@ def test_multicol_joins(join_strategy, join_type, make_df, n_partitions: int): indirect=True, ) @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_dupes_join_key(join_strategy, join_type, make_df, n_partitions: int): +def test_dupes_join_key(join_strategy, join_type, make_df, n_partitions: int, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) df = make_df( @@ -151,7 +151,7 @@ def test_dupes_join_key(join_strategy, join_type, make_df, n_partitions: int): ) joined = df.join(df, on="A", strategy=join_strategy, how=join_type) - joined = joined.sort(["A", "B"]) + joined = joined.sort(["A", "B", "right.B"]) joined_data = joined.to_pydict() assert joined_data == { @@ -168,7 +168,7 @@ def test_dupes_join_key(join_strategy, join_type, make_df, n_partitions: int): indirect=True, ) @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_multicol_dupes_join_key(join_strategy, join_type, make_df, n_partitions: int): +def test_multicol_dupes_join_key(join_strategy, join_type, make_df, n_partitions: int, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) df = make_df( @@ -182,14 +182,14 @@ def test_multicol_dupes_join_key(join_strategy, join_type, make_df, n_partitions ) joined = df.join(df, on=["A", "B"], strategy=join_strategy, how=join_type) - joined = joined.sort(["A", "B", "C"]) + joined = joined.sort(["A", "B", "C", "right.C"]) joined_data = joined.to_pydict() assert joined_data == { "A": [1, 1, 1, 1, 2, 2, 2, 2, 3, 3], "B": ["a"] * 4 + ["b"] * 4 + ["c", "d"], "C": [0, 0, 1, 1, 0, 0, 1, 1, 1, 0], - "right.C": [1, 0, 1, 0, 1, 0, 1, 0, 1, 0], + "right.C": [0, 1, 0, 1, 0, 1, 0, 1, 1, 0], } @@ -200,7 +200,7 @@ def test_multicol_dupes_join_key(join_strategy, join_type, make_df, n_partitions indirect=True, ) @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_joins_all_same_key(join_strategy, join_type, make_df, n_partitions: int): +def test_joins_all_same_key(join_strategy, join_type, make_df, n_partitions: int, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) df = make_df( @@ -213,7 +213,7 @@ def test_joins_all_same_key(join_strategy, join_type, make_df, n_partitions: int ) joined = df.join(df, on="A", strategy=join_strategy, how=join_type) - joined = joined.sort(["A", "B"]) + joined = joined.sort(["A", "B", "right.B"]) joined_data = joined.to_pydict() assert joined_data == { @@ -274,7 +274,9 @@ def test_joins_all_same_key(join_strategy, join_type, make_df, n_partitions: int ), ], ) -def test_joins_no_overlap_disjoint(join_strategy, join_type, flip, expected, make_df, n_partitions: int): +def test_joins_no_overlap_disjoint( + join_strategy, join_type, flip, expected, make_df, n_partitions: int, with_morsel_size +): skip_invalid_join_strategies(join_strategy, join_type) df1 = make_df( @@ -355,7 +357,9 @@ def test_joins_no_overlap_disjoint(join_strategy, join_type, flip, expected, mak ), ], ) -def test_joins_no_overlap_interleaved(join_strategy, join_type, flip, expected, make_df, n_partitions: int): +def test_joins_no_overlap_interleaved( + join_strategy, join_type, flip, expected, make_df, n_partitions: int, with_morsel_size +): skip_invalid_join_strategies(join_strategy, join_type) df1 = make_df( @@ -394,7 +398,7 @@ def test_joins_no_overlap_interleaved(join_strategy, join_type, flip, expected, indirect=True, ) @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_limit_after_join(join_strategy, join_type, make_df, n_partitions: int): +def test_limit_after_join(join_strategy, join_type, make_df, n_partitions: int, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) data = { @@ -461,7 +465,7 @@ def test_limit_after_join(join_strategy, join_type, make_df, n_partitions: int): ), ], ) -def test_join_with_null(join_strategy, join_type, expected, make_df, repartition_nparts): +def test_join_with_null(join_strategy, join_type, expected, make_df, repartition_nparts, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df = make_df( @@ -527,7 +531,7 @@ def test_join_with_null(join_strategy, join_type, expected, make_df, repartition ), ], ) -def test_join_with_null_multikey(join_strategy, join_type, expected, make_df, repartition_nparts): +def test_join_with_null_multikey(join_strategy, join_type, expected, make_df, repartition_nparts, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df = make_df( @@ -610,7 +614,9 @@ def test_join_with_null_multikey(join_strategy, join_type, expected, make_df, re ), ], ) -def test_join_with_null_asymmetric_multikey(join_strategy, join_type, expected, make_df, repartition_nparts): +def test_join_with_null_asymmetric_multikey( + join_strategy, join_type, expected, make_df, repartition_nparts, with_morsel_size +): skip_invalid_join_strategies(join_strategy, join_type) daft_df = make_df( @@ -685,7 +691,7 @@ def test_join_with_null_asymmetric_multikey(join_strategy, join_type, expected, ), ], ) -def test_join_all_null(join_strategy, join_type, expected, make_df, repartition_nparts): +def test_join_all_null(join_strategy, join_type, expected, make_df, repartition_nparts, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df = make_df( @@ -719,7 +725,7 @@ def test_join_all_null(join_strategy, join_type, expected, make_df, repartition_ indirect=True, ) @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_join_null_type_column(join_strategy, join_type, make_df): +def test_join_null_type_column(join_strategy, join_type, make_df, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df = make_df( @@ -764,7 +770,7 @@ def test_join_null_type_column(join_strategy, join_type, make_df): ), ], ) -def test_join_semi_anti(join_strategy, join_type, expected, make_df, repartition_nparts): +def test_join_semi_anti(join_strategy, join_type, expected, make_df, repartition_nparts, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df1 = make_df( @@ -817,7 +823,9 @@ def test_join_semi_anti(join_strategy, join_type, expected, make_df, repartition ), ], ) -def test_join_semi_anti_different_names(join_strategy, join_type, expected, make_df, repartition_nparts): +def test_join_semi_anti_different_names( + join_strategy, join_type, expected, make_df, repartition_nparts, with_morsel_size +): skip_invalid_join_strategies(join_strategy, join_type) daft_df1 = make_df( @@ -852,7 +860,7 @@ def test_join_semi_anti_different_names(join_strategy, join_type, expected, make @pytest.mark.parametrize("join_type", ["inner", "left", "right", "outer"]) -def test_join_true_join_keys(join_type, make_df): +def test_join_true_join_keys(join_type, make_df, with_morsel_size): daft_df = make_df( { "id": [1, 2, 3], @@ -924,7 +932,7 @@ def test_join_true_join_keys(join_type, make_df): ), ], ) -def test_join_with_alias_in_key(join_strategy, join_type, expected, make_df): +def test_join_with_alias_in_key(join_strategy, join_type, expected, make_df, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df1 = make_df( @@ -995,7 +1003,7 @@ def test_join_with_alias_in_key(join_strategy, join_type, expected, make_df): ), ], ) -def test_join_same_name_alias(join_strategy, join_type, expected, make_df): +def test_join_same_name_alias(join_strategy, join_type, expected, make_df, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df1 = make_df( @@ -1066,7 +1074,7 @@ def test_join_same_name_alias(join_strategy, join_type, expected, make_df): ), ], ) -def test_join_same_name_alias_with_compute(join_strategy, join_type, expected, make_df): +def test_join_same_name_alias_with_compute(join_strategy, join_type, expected, make_df, with_morsel_size): skip_invalid_join_strategies(join_strategy, join_type) daft_df1 = make_df( @@ -1098,7 +1106,7 @@ def test_join_same_name_alias_with_compute(join_strategy, join_type, expected, m ("_right", "prefix.", "prefix.score_right"), ], ) -def test_join_suffix_and_prefix(suffix, prefix, expected, make_df): +def test_join_suffix_and_prefix(suffix, prefix, expected, make_df, with_morsel_size): df1 = daft.from_pydict({"idx": [1, 2], "val": [10, 20]}) df2 = daft.from_pydict({"idx": [3], "score": [0.1]}) df3 = daft.from_pydict({"idx": [1], "score": [0.1]}) diff --git a/tests/dataframe/test_map_groups.py b/tests/dataframe/test_map_groups.py index 4f0f2e29ec..0024c02797 100644 --- a/tests/dataframe/test_map_groups.py +++ b/tests/dataframe/test_map_groups.py @@ -6,7 +6,7 @@ @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_map_groups(make_df, repartition_nparts): +def test_map_groups(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 2], @@ -38,7 +38,7 @@ def udf(a, b): @pytest.mark.parametrize("repartition_nparts", [1, 2, 3]) @pytest.mark.parametrize("output_when_empty", [[], [1], [1, 2]]) -def test_map_groups_more_than_one_output_row(make_df, repartition_nparts, output_when_empty): +def test_map_groups_more_than_one_output_row(make_df, repartition_nparts, output_when_empty, with_morsel_size): daft_df = make_df( { "group": [1, 2], @@ -63,7 +63,7 @@ def udf(a): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_map_groups_single_group(make_df, repartition_nparts): +def test_map_groups_single_group(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 1], @@ -88,7 +88,7 @@ def udf(a): @pytest.mark.parametrize("repartition_nparts", [1, 5, 11]) -def test_map_groups_double_group_by(make_df, repartition_nparts): +def test_map_groups_double_group_by(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group_1": [1, 1, 1, 1, 1, 2, 2, 2, 2, 2], @@ -118,7 +118,7 @@ def udf(a): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_map_groups_compound_input(make_df, repartition_nparts): +def test_map_groups_compound_input(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 2, 2], @@ -143,7 +143,7 @@ def udf(data): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_map_groups_with_alias(make_df, repartition_nparts): +def test_map_groups_with_alias(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": [1, 1, 2], diff --git a/tests/dataframe/test_pivot.py b/tests/dataframe/test_pivot.py index fcd88c9c51..7c2b2d45a3 100644 --- a/tests/dataframe/test_pivot.py +++ b/tests/dataframe/test_pivot.py @@ -2,7 +2,7 @@ @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_pivot(make_df, repartition_nparts): +def test_pivot(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": ["A", "A", "B", "B"], @@ -23,7 +23,7 @@ def test_pivot(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_pivot_with_col_names(make_df, repartition_nparts): +def test_pivot_with_col_names(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": ["A", "A", "B", "B"], @@ -50,7 +50,7 @@ def test_pivot_with_col_names(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_pivot_with_col_names_subset(make_df, repartition_nparts): +def test_pivot_with_col_names_subset(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": ["A", "A", "B", "B"], @@ -76,7 +76,7 @@ def test_pivot_with_col_names_subset(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_pivot_with_col_names_superset(make_df, repartition_nparts): +def test_pivot_with_col_names_superset(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": ["A", "A", "B", "B"], @@ -104,7 +104,7 @@ def test_pivot_with_col_names_superset(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_pivot_with_nulls(make_df, repartition_nparts): +def test_pivot_with_nulls(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": ["A", None, "B", "B"], @@ -136,7 +136,7 @@ def test_pivot_with_nulls(make_df, repartition_nparts): ("count", {"group": ["A", "B"], "1": [2, 2]}), ], ) -def test_pivot_with_different_aggs(make_df, repartition_nparts, agg_fn, expected): +def test_pivot_with_different_aggs(make_df, repartition_nparts, agg_fn, expected, with_morsel_size): daft_df = make_df( { "group": ["A", "A", "B", "B"], @@ -151,7 +151,7 @@ def test_pivot_with_different_aggs(make_df, repartition_nparts, agg_fn, expected @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_pivot_with_multiple_group_by(make_df, repartition_nparts): +def test_pivot_with_multiple_group_by(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group1": ["A", "A", "A", "B", "B", "B"], @@ -173,7 +173,7 @@ def test_pivot_with_multiple_group_by(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 5]) -def test_pivot_with_downstream_ops(make_df, repartition_nparts): +def test_pivot_with_downstream_ops(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "group": ["A", "A", "B", "B"], diff --git a/tests/dataframe/test_sort.py b/tests/dataframe/test_sort.py index 8a831a2bcf..a6e325de7e 100644 --- a/tests/dataframe/test_sort.py +++ b/tests/dataframe/test_sort.py @@ -34,7 +34,7 @@ def test_disallowed_sort_bytes(make_df): @pytest.mark.parametrize("desc", [True, False]) @pytest.mark.parametrize("n_partitions", [1, 3]) -def test_single_float_col_sort(make_df, desc: bool, n_partitions: int): +def test_single_float_col_sort(make_df, desc: bool, n_partitions: int, with_morsel_size): df = make_df({"A": [1.0, None, 3.0, float("nan"), 2.0]}, repartition=n_partitions) df = df.sort("A", desc=desc) sorted_data = df.to_pydict() @@ -51,7 +51,7 @@ def _replace_nan_with_string(items): @pytest.mark.skip(reason="Issue: https://github.com/Eventual-Inc/Daft/issues/546") @pytest.mark.parametrize("n_partitions", [1, 3]) -def test_multi_float_col_sort(make_df, n_partitions: int): +def test_multi_float_col_sort(make_df, n_partitions: int, with_morsel_size): df = make_df( { "A": [1.0, 1.0, None, None, float("nan"), float("nan"), float("nan")], @@ -98,7 +98,7 @@ def _replace_nan_with_string(items): @pytest.mark.parametrize("desc", [True, False]) @pytest.mark.parametrize("n_partitions", [1, 3]) -def test_single_string_col_sort(make_df, desc: bool, n_partitions: int): +def test_single_string_col_sort(make_df, desc: bool, n_partitions: int, with_morsel_size): df = make_df({"A": ["0", None, "1", "", "01"]}, repartition=n_partitions) df = df.sort("A", desc=desc) sorted_data = df.to_pydict() @@ -112,7 +112,7 @@ def test_single_string_col_sort(make_df, desc: bool, n_partitions: int): @pytest.mark.parametrize("desc", [True, False]) @pytest.mark.parametrize("n_partitions", [1, 3, 4]) -def test_single_bool_col_sort(make_df, desc: bool, n_partitions: int): +def test_single_bool_col_sort(make_df, desc: bool, n_partitions: int, with_morsel_size): df = make_df({"A": [True, None, False, True, False]}, repartition=n_partitions) df = df.sort("A", desc=desc) sorted_data = df.to_pydict() @@ -125,7 +125,7 @@ def test_single_bool_col_sort(make_df, desc: bool, n_partitions: int): @pytest.mark.parametrize("n_partitions", [1, 3, 4]) -def test_multi_bool_col_sort(make_df, n_partitions: int): +def test_multi_bool_col_sort(make_df, n_partitions: int, with_morsel_size): df = make_df( { "A": [True, False, None, False, True], @@ -151,7 +151,7 @@ def test_multi_bool_col_sort(make_df, n_partitions: int): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_int_sort_with_nulls(make_df, repartition_nparts): +def test_int_sort_with_nulls(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [2, None, 1], @@ -173,7 +173,7 @@ def test_int_sort_with_nulls(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_str_sort_with_nulls(make_df, repartition_nparts): +def test_str_sort_with_nulls(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [1, None, 2], @@ -194,7 +194,7 @@ def test_str_sort_with_nulls(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 4, 6]) -def test_sort_with_nulls_multikey(make_df, repartition_nparts): +def test_sort_with_nulls_multikey(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id1": [2, None, 2, None, 1], @@ -217,7 +217,7 @@ def test_sort_with_nulls_multikey(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2, 4]) -def test_sort_with_all_nulls(make_df, repartition_nparts): +def test_sort_with_all_nulls(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [None, None, None], @@ -234,7 +234,7 @@ def test_sort_with_all_nulls(make_df, repartition_nparts): @pytest.mark.parametrize("repartition_nparts", [1, 2]) -def test_sort_with_empty(make_df, repartition_nparts): +def test_sort_with_empty(make_df, repartition_nparts, with_morsel_size): daft_df = make_df( { "id": [1], diff --git a/tests/dataframe/test_stddev.py b/tests/dataframe/test_stddev.py index 464d20bd41..0c24cbc31c 100644 --- a/tests/dataframe/test_stddev.py +++ b/tests/dataframe/test_stddev.py @@ -49,7 +49,7 @@ def stddev(nums) -> float: @pytest.mark.parametrize("data_and_expected", TESTS) -def test_stddev_with_single_partition(data_and_expected): +def test_stddev_with_single_partition(data_and_expected, with_morsel_size): data, expected = data_and_expected df = daft.from_pydict({"a": data}) result = df.agg(daft.col("a").stddev()).collect() @@ -65,7 +65,7 @@ def test_stddev_with_single_partition(data_and_expected): @pytest.mark.parametrize("data_and_expected", TESTS) -def test_stddev_with_multiple_partitions(data_and_expected): +def test_stddev_with_multiple_partitions(data_and_expected, with_morsel_size): data, expected = data_and_expected df = daft.from_pydict({"a": data}).into_partitions(2) result = df.agg(daft.col("a").stddev()).collect() @@ -99,7 +99,7 @@ def unzip_rows(rows: list) -> Tuple[List, List]: @pytest.mark.parametrize("data_and_expected", GROUPED_TESTS) -def test_grouped_stddev_with_single_partition(data_and_expected): +def test_grouped_stddev_with_single_partition(data_and_expected, with_morsel_size): nums, expected_keys, expected_stddevs = data_and_expected expected_df = daft.from_pydict({"keys": expected_keys, "data": expected_stddevs}) keys, data = unzip_rows(nums) @@ -122,7 +122,7 @@ def test_grouped_stddev_with_single_partition(data_and_expected): @pytest.mark.parametrize("data_and_expected", GROUPED_TESTS) -def test_grouped_stddev_with_multiple_partitions(data_and_expected): +def test_grouped_stddev_with_multiple_partitions(data_and_expected, with_morsel_size): nums, expected_keys, expected_stddevs = data_and_expected expected_df = daft.from_pydict({"keys": expected_keys, "data": expected_stddevs}) keys, data = unzip_rows(nums) diff --git a/tests/dataframe/test_unpivot.py b/tests/dataframe/test_unpivot.py index b4c7a84cc5..1767306fc8 100644 --- a/tests/dataframe/test_unpivot.py +++ b/tests/dataframe/test_unpivot.py @@ -4,8 +4,8 @@ from daft.datatype import DataType -@pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot(make_df, n_partitions): +@pytest.mark.parametrize("n_partitions", [2]) +def test_unpivot(make_df, n_partitions, with_morsel_size): df = make_df( { "id": ["x", "y", "z"], @@ -16,7 +16,7 @@ def test_unpivot(make_df, n_partitions): ) df = df.unpivot("id", ["a", "b"]) - df = df.sort("id") + df = df.sort(["id", "variable"]) df = df.collect() expected = { @@ -29,7 +29,7 @@ def test_unpivot(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_no_values(make_df, n_partitions): +def test_unpivot_no_values(make_df, n_partitions, with_morsel_size): df = make_df( { "id": ["x", "y", "z"], @@ -40,7 +40,7 @@ def test_unpivot_no_values(make_df, n_partitions): ) df = df.unpivot("id") - df = df.sort("id") + df = df.sort(["id", "variable"]) df = df.collect() expected = { @@ -53,7 +53,7 @@ def test_unpivot_no_values(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_different_types(make_df, n_partitions): +def test_unpivot_different_types(make_df, n_partitions, with_morsel_size): df = make_df( { "id": ["x", "y", "z"], @@ -64,7 +64,7 @@ def test_unpivot_different_types(make_df, n_partitions): ) df = df.unpivot("id", ["a", "b"]) - df = df.sort("id") + df = df.sort(["id", "variable"]) df = df.collect() expected = { @@ -77,7 +77,7 @@ def test_unpivot_different_types(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_incompatible_types(make_df, n_partitions): +def test_unpivot_incompatible_types(make_df, n_partitions, with_morsel_size): df = make_df( { "id": ["x", "y", "z"], @@ -92,7 +92,7 @@ def test_unpivot_incompatible_types(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_nulls(make_df, n_partitions): +def test_unpivot_nulls(make_df, n_partitions, with_morsel_size): df = make_df( { "id": ["x", "y", "z"], @@ -103,7 +103,7 @@ def test_unpivot_nulls(make_df, n_partitions): ) df = df.unpivot("id", ["a", "b"]) - df = df.sort("id") + df = df.sort(["id", "variable"]) df = df.collect() expected = { @@ -116,7 +116,7 @@ def test_unpivot_nulls(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_null_column(make_df, n_partitions): +def test_unpivot_null_column(make_df, n_partitions, with_morsel_size): df = make_df( { "id": ["x", "y", "z"], @@ -127,7 +127,7 @@ def test_unpivot_null_column(make_df, n_partitions): ) df = df.unpivot("id", ["a", "b"]) - df = df.sort("id") + df = df.sort(["id", "variable"]) df = df.collect() expected = { @@ -140,7 +140,7 @@ def test_unpivot_null_column(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_multiple_ids(make_df, n_partitions): +def test_unpivot_multiple_ids(make_df, n_partitions, with_morsel_size): df = make_df( { "id1": ["x", "y", "z"], @@ -152,7 +152,7 @@ def test_unpivot_multiple_ids(make_df, n_partitions): ) df = df.unpivot(["id1", "id2"], ["a", "b"]) - df = df.sort("id1") + df = df.sort(["id1", "id2", "variable"]) df = df.collect() expected = { @@ -166,7 +166,7 @@ def test_unpivot_multiple_ids(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_no_ids(make_df, n_partitions): +def test_unpivot_no_ids(make_df, n_partitions, with_morsel_size): df = make_df( { "a": [1, 3, 5], @@ -188,7 +188,7 @@ def test_unpivot_no_ids(make_df, n_partitions): @pytest.mark.parametrize("n_partitions", [1, 2, 4]) -def test_unpivot_expr(make_df, n_partitions): +def test_unpivot_expr(make_df, n_partitions, with_morsel_size): df = make_df( { "id": ["x", "y", "z"], @@ -199,13 +199,13 @@ def test_unpivot_expr(make_df, n_partitions): ) df = df.unpivot("id", ["a", "b", (col("a") + col("b")).alias("a_plus_b")]) - df = df.sort("id") + df = df.sort(["id", "variable"]) df = df.collect() expected = { "id": ["x", "x", "x", "y", "y", "y", "z", "z", "z"], - "variable": ["a", "b", "a_plus_b", "a", "b", "a_plus_b", "a", "b", "a_plus_b"], - "value": [1, 2, 3, 3, 4, 7, 5, 6, 11], + "variable": ["a", "a_plus_b", "b", "a", "a_plus_b", "b", "a", "a_plus_b", "b"], + "value": [1, 3, 2, 3, 7, 4, 5, 11, 6], } assert df.to_pydict() == expected @@ -244,7 +244,7 @@ def test_unpivot_empty_partition(make_df): df = df.into_partitions(4) df = df.unpivot("id", ["a", "b"]) - df = df.sort("id") + df = df.sort(["id", "variable"]) df = df.collect() expected = {