Skip to content

Commit

Permalink
[CHORE] Enable sources to return empty tables (#2915)
Browse files Browse the repository at this point in the history
Sources in the native executor should still return empty tables if they
didn't read anything.

This unlocks a bunch of tests.

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Cory Grinstead <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
4 people authored Sep 25, 2024
1 parent a9fdd19 commit b1ea3b9
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 21 deletions.
4 changes: 3 additions & 1 deletion src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ pub fn physical_plan_to_pipeline(
}
LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => {
let partitions = psets.get(&info.cache_key).expect("Cache key not found");
InMemorySource::new(partitions.clone()).boxed().into()
InMemorySource::new(partitions.clone(), info.source_schema.clone())
.boxed()
.into()
}
LocalPhysicalPlan::Project(Project {
input, projection, ..
Expand Down
13 changes: 9 additions & 4 deletions src/daft-local-execution/src/sources/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use daft_core::prelude::SchemaRef;
use daft_io::IOStatsRef;
use daft_micropartition::MicroPartition;
use tracing::instrument;
Expand All @@ -9,11 +10,12 @@ use crate::{sources::source::SourceStream, ExecutionRuntimeHandle};

pub struct InMemorySource {
data: Vec<Arc<MicroPartition>>,
schema: SchemaRef,
}

impl InMemorySource {
pub fn new(data: Vec<Arc<MicroPartition>>) -> Self {
Self { data }
pub fn new(data: Vec<Arc<MicroPartition>>, schema: SchemaRef) -> Self {
Self { data, schema }
}
pub fn boxed(self) -> Box<dyn Source> {
Box::new(self) as Box<dyn Source>
Expand All @@ -28,8 +30,11 @@ impl Source for InMemorySource {
_runtime_handle: &mut ExecutionRuntimeHandle,
_io_stats: IOStatsRef,
) -> crate::Result<SourceStream<'static>> {
let data = self.data.clone();
Ok(Box::pin(futures::stream::iter(data)))
if self.data.is_empty() {
let empty = Arc::new(MicroPartition::empty(Some(self.schema.clone())));
return Ok(Box::pin(futures::stream::once(async { empty })));
}
Ok(Box::pin(futures::stream::iter(self.data.clone())))
}
fn name(&self) -> &'static str {
"InMemory"
Expand Down
7 changes: 7 additions & 0 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ impl ScanTaskSource {
maintain_order: bool,
io_stats: IOStatsRef,
) -> DaftResult<()> {
let schema = scan_task.materialized_schema();
let mut stream = stream_scan_task(scan_task, Some(io_stats), maintain_order).await?;
let mut has_data = false;
while let Some(partition) = stream.next().await {
let _ = sender.send(partition?).await;
has_data = true;
}
if !has_data {
let empty = Arc::new(MicroPartition::empty(Some(schema.clone())));
let _ = sender.send(empty).await;
}
Ok(())
}
Expand Down
5 changes: 0 additions & 5 deletions tests/cookbook/test_pandas_cookbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,10 @@
import pytest

import daft
from daft import context
from daft.datatype import DataType
from daft.expressions import col, lit
from tests.conftest import assert_df_equals

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
###
# Idioms: if-then
###
Expand Down
6 changes: 0 additions & 6 deletions tests/dataframe/test_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,8 @@
from PIL import Image

import daft
from daft import context
from tests.utils import ANSI_ESCAPE, TD_STYLE, TH_STYLE

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)

ROW_DIVIDER_REGEX = re.compile(r"╭─+┬*─*╮|├╌+┼*╌+┤")
SHOWING_N_ROWS_REGEX = re.compile(r".*\(Showing first (\d+) of (\d+) rows\).*")
UNMATERIALIZED_REGEX = re.compile(r".*\(No data to display: Dataframe not materialized\).*")
Expand Down
5 changes: 0 additions & 5 deletions tests/dataframe/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@
import pyarrow as pa
import pytest

from daft import context
from daft.datatype import DataType
from daft.errors import ExpressionTypeError

pytestmark = pytest.mark.skipif(
context.get_context().daft_execution_config.enable_native_executor is True,
reason="Native executor fails for these tests",
)
###
# Validation tests
###
Expand Down

0 comments on commit b1ea3b9

Please sign in to comment.