diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index a73fdd0781..60aa80992d 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -560,7 +560,9 @@ def place_in_queue(item): # If it is a no-op task, just run it locally immediately. elif len(next_step.instructions) == 0: logger.debug("Running task synchronously in main thread: %s", next_step) - assert isinstance(next_step, SingleOutputPartitionTask) + assert ( + len(next_step.partial_metadatas) == 1 + ), "No-op tasks must have one output by definition, since there are no instructions to run" [single_partial] = next_step.partial_metadatas if single_partial.num_rows is None: [single_meta] = ray.get(get_metas.remote(next_step.inputs)) @@ -577,6 +579,7 @@ def place_in_queue(item): ) ] ) + next_step.set_result( [RayMaterializedResult(partition, accessor, 0) for partition in next_step.inputs] ) diff --git a/tests/dataframe/test_repartition.py b/tests/dataframe/test_repartition.py index ff864a73f4..3c559131cc 100644 --- a/tests/dataframe/test_repartition.py +++ b/tests/dataframe/test_repartition.py @@ -11,3 +11,15 @@ def test_into_partitions_coalesce(make_df) -> None: data = {"foo": list(range(100))} df = make_df(data).into_partitions(20).into_partitions(1).collect() assert df.to_pydict() == data + + +def test_into_partitions_some_no_split(make_df) -> None: + data = {"foo": [1, 2, 3]} + + # Materialize as 3 partitions + df = make_df(data).into_partitions(3).collect() + + # Attempt to split into 4 partitions, so only 1 split occurs + df = df.into_partitions(4).collect() + + assert df.to_pydict() == data