From b1ea3b9749e01512f48dfd45f9899a329fc9799f Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 25 Sep 2024 13:32:19 -0700 Subject: [PATCH] [CHORE] Enable sources to return empty tables (#2915) Sources in the native executor should still return empty tables if they didn't read anything. This unlocks a bunch of tests. --------- Co-authored-by: Colin Ho Co-authored-by: Cory Grinstead Co-authored-by: Colin Ho --- src/daft-local-execution/src/pipeline.rs | 4 +++- src/daft-local-execution/src/sources/in_memory.rs | 13 +++++++++---- src/daft-local-execution/src/sources/scan_task.rs | 7 +++++++ tests/cookbook/test_pandas_cookbook.py | 5 ----- tests/dataframe/test_repr.py | 6 ------ tests/dataframe/test_sort.py | 5 ----- 6 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 9f7da9b915..17146f216d 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -110,7 +110,9 @@ pub fn physical_plan_to_pipeline( } LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { let partitions = psets.get(&info.cache_key).expect("Cache key not found"); - InMemorySource::new(partitions.clone()).boxed().into() + InMemorySource::new(partitions.clone(), info.source_schema.clone()) + .boxed() + .into() } LocalPhysicalPlan::Project(Project { input, projection, .. diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index 1212dd13cb..1bf08a8913 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use daft_core::prelude::SchemaRef; use daft_io::IOStatsRef; use daft_micropartition::MicroPartition; use tracing::instrument; @@ -9,11 +10,12 @@ use crate::{sources::source::SourceStream, ExecutionRuntimeHandle}; pub struct InMemorySource { data: Vec>, + schema: SchemaRef, } impl InMemorySource { - pub fn new(data: Vec>) -> Self { - Self { data } + pub fn new(data: Vec>, schema: SchemaRef) -> Self { + Self { data, schema } } pub fn boxed(self) -> Box { Box::new(self) as Box @@ -28,8 +30,11 @@ impl Source for InMemorySource { _runtime_handle: &mut ExecutionRuntimeHandle, _io_stats: IOStatsRef, ) -> crate::Result> { - let data = self.data.clone(); - Ok(Box::pin(futures::stream::iter(data))) + if self.data.is_empty() { + let empty = Arc::new(MicroPartition::empty(Some(self.schema.clone()))); + return Ok(Box::pin(futures::stream::once(async { empty }))); + } + Ok(Box::pin(futures::stream::iter(self.data.clone()))) } fn name(&self) -> &'static str { "InMemory" diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index f7374fa80a..7d36ba6a22 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -38,9 +38,16 @@ impl ScanTaskSource { maintain_order: bool, io_stats: IOStatsRef, ) -> DaftResult<()> { + let schema = scan_task.materialized_schema(); let mut stream = stream_scan_task(scan_task, Some(io_stats), maintain_order).await?; + let mut has_data = false; while let Some(partition) = stream.next().await { let _ = sender.send(partition?).await; + has_data = true; + } + if !has_data { + let empty = Arc::new(MicroPartition::empty(Some(schema.clone()))); + let _ = sender.send(empty).await; } Ok(()) } diff --git a/tests/cookbook/test_pandas_cookbook.py b/tests/cookbook/test_pandas_cookbook.py index 56838f7490..c852568f3f 100644 --- a/tests/cookbook/test_pandas_cookbook.py +++ b/tests/cookbook/test_pandas_cookbook.py @@ -7,15 +7,10 @@ import pytest import daft -from daft import context from daft.datatype import DataType from daft.expressions import col, lit from tests.conftest import assert_df_equals -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) ### # Idioms: if-then ### diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index d72082ca3b..8e04421901 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -8,14 +8,8 @@ from PIL import Image import daft -from daft import context from tests.utils import ANSI_ESCAPE, TD_STYLE, TH_STYLE -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) - ROW_DIVIDER_REGEX = re.compile(r"╭─+┬*─*╮|├╌+┼*╌+┤") SHOWING_N_ROWS_REGEX = re.compile(r".*\(Showing first (\d+) of (\d+) rows\).*") UNMATERIALIZED_REGEX = re.compile(r".*\(No data to display: Dataframe not materialized\).*") diff --git a/tests/dataframe/test_sort.py b/tests/dataframe/test_sort.py index e972c13831..8a831a2bcf 100644 --- a/tests/dataframe/test_sort.py +++ b/tests/dataframe/test_sort.py @@ -5,14 +5,9 @@ import pyarrow as pa import pytest -from daft import context from daft.datatype import DataType from daft.errors import ExpressionTypeError -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", -) ### # Validation tests ###