Skip to content

Commit

Permalink
implement empty scan
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Dec 20, 2023
1 parent 8b8d452 commit d5214e2
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 12 deletions.
27 changes: 27 additions & 0 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ def scan_with_tasks(
)
yield scan_step

def empty_scan(
schema: Schema,
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
"""yield a plan to create an empty Partition
"""
scan_step = execution_step.PartitionTaskBuilder[PartitionT](inputs=[], partial_metadatas=None,).add_instruction(
instruction=EmptyScan(schema=schema),
resource_request=ResourceRequest(memory_bytes=0),
)
yield scan_step

@dataclass(frozen=True)
class ScanWithTask(execution_step.SingleOutputInstruction):
Expand All @@ -63,6 +73,23 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
)
]

@dataclass(frozen=True)
class EmptyScan(execution_step.SingleOutputInstruction):
schema: Schema

def run(self, inputs: list[MicroPartition]) -> list[MicroPartition]:
return [MicroPartition.empty(self.schema)]

def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) -> list[PartialPartitionMetadata]:
assert len(input_metadatas) == 0

return [
PartialPartitionMetadata(
num_rows=0,
size_bytes=0,
)
]


def tabular_scan(
schema: PySchema,
Expand Down
22 changes: 22 additions & 0 deletions src/daft-plan/src/physical_ops/empty_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use crate::{source_info::InMemoryInfo, PartitionSpec};
use daft_core::schema::SchemaRef;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EmptyScan {
pub schema: SchemaRef,
pub partition_spec: Arc<PartitionSpec>,
}

impl EmptyScan {
pub(crate) fn new(
schema: SchemaRef,
partition_spec: Arc<PartitionSpec>,
) -> Self {
Self {
schema,
partition_spec,
}
}
}
2 changes: 2 additions & 0 deletions src/daft-plan/src/physical_ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod coalesce;
mod concat;
mod csv;
mod explode;
mod empty_scan;
mod fanout;
mod filter;
mod flatten;
Expand All @@ -25,6 +26,7 @@ pub use coalesce::Coalesce;
pub use concat::Concat;
pub use csv::{TabularScanCsv, TabularWriteCsv};
pub use explode::Explode;
pub use empty_scan::EmptyScan;
pub use fanout::{FanoutByHash, FanoutByRange, FanoutRandom};
pub use filter::Filter;
pub use flatten::Flatten;
Expand Down
18 changes: 18 additions & 0 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub enum PhysicalPlan {
TabularScanCsv(TabularScanCsv),
TabularScanJson(TabularScanJson),
TabularScan(TabularScan),
EmptyScan(EmptyScan),
Project(Project),
Filter(Filter),
Limit(Limit),
Expand Down Expand Up @@ -65,6 +66,7 @@ impl PhysicalPlan {
#[cfg(feature = "python")]
Self::InMemoryScan(InMemoryScan { partition_spec, .. }) => partition_spec.clone(),
Self::TabularScan(TabularScan { partition_spec, .. }) => partition_spec.clone(),
Self::EmptyScan(EmptyScan { partition_spec, .. }) => partition_spec.clone(),
Self::TabularScanParquet(TabularScanParquet { partition_spec, .. }) => {
partition_spec.clone()
}
Expand Down Expand Up @@ -179,6 +181,7 @@ impl PhysicalPlan {
.iter()
.map(|scan_task| scan_task.size_bytes())
.sum::<Option<usize>>(),
Self::EmptyScan(..) => Some(0),
// Assume no row/column pruning in cardinality-affecting operations.
// TODO(Clark): Estimate row/column pruning to get a better size approximation.
Self::Filter(Filter { input, .. })
Expand Down Expand Up @@ -392,6 +395,21 @@ impl PhysicalPlan {
.collect::<Vec<PyScanTask>>(),))?;
Ok(py_iter.into())
}
PhysicalPlan::EmptyScan(EmptyScan { schema, .. }) => {
let schema_mod =
py.import(pyo3::intern!(py, "daft.logical.schema"))?;
let python_schema = schema_mod
.getattr(pyo3::intern!(py, "Schema"))?
.getattr(pyo3::intern!(py, "_from_pyschema"))?
.call1((PySchema {schema: schema.clone()},))?;

let py_iter = py
.import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))?
.getattr(pyo3::intern!(py, "empty_scan"))?
.call1((python_schema,))?;
Ok(py_iter.into())
}

PhysicalPlan::TabularScanParquet(TabularScanParquet {
projection_schema,
external_info:
Expand Down
36 changes: 26 additions & 10 deletions src/daft-plan/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
SourceInfo::ExternalInfo(ExternalSourceInfo::Scan(ScanExternalInfo {
pushdowns,
scan_op,
source_schema,
..
})) => {
let scan_tasks = scan_op.0.to_scan_tasks(pushdowns.clone())?;
Expand All @@ -85,17 +86,32 @@ pub fn plan(logical_plan: &LogicalPlan, cfg: Arc<DaftExecutionConfig>) -> DaftRe
cfg.merge_scan_tasks_min_size_bytes,
cfg.merge_scan_tasks_max_size_bytes,
);
let scan_tasks = scan_tasks.collect::<DaftResult<Vec<_>>>()?;
if scan_tasks.is_empty() {
let partition_spec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Unknown,
1,
None,
));

Ok(PhysicalPlan::EmptyScan(EmptyScan::new(
source_schema.clone(),
partition_spec,
)))
} else {
let partition_spec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Unknown,
scan_tasks.len(),
None,
));

Ok(PhysicalPlan::TabularScan(TabularScan::new(
scan_tasks,
partition_spec,
)))
}


let scan_tasks = scan_tasks.collect::<DaftResult<Vec<_>>>()?;
let partition_spec = Arc::new(PartitionSpec::new_internal(
PartitionScheme::Unknown,
scan_tasks.len(),
None,
));
Ok(PhysicalPlan::TabularScan(TabularScan::new(
scan_tasks,
partition_spec,
)))
}
#[cfg(feature = "python")]
SourceInfo::InMemoryInfo(mem_info) => {
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/iceberg/test_table_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,12 @@ def test_daft_iceberg_table_predicate_pushdown_months(local_iceberg_catalog):

tab = local_iceberg_catalog.load_table("default.test_partitioned_by_months")
df = daft.read_iceberg(tab)
df = df.where(df["dt"] < date(2023, 1, 1))
df.collect()
df = df.where(df["dt"] > date(2025, 1, 1))
import ipdb

ipdb.set_trace()
df.collect()

daft_pandas = df.to_pandas()
iceberg_pandas = tab.scan().to_arrow().to_pandas()
assert_df_equals(daft_pandas, iceberg_pandas, sort_key=[])

0 comments on commit d5214e2

Please sign in to comment.