Skip to content

Commit

Permalink
Properly dispatch limited reads
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiayue Charles Lin committed Oct 7, 2023
1 parent 9c32d73 commit 05dd97a
Showing 1 changed file with 37 additions and 10 deletions.
47 changes: 37 additions & 10 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
ExternalInfo, FileFormat, FileFormatConfig, FileInfos, InMemoryInfo,
PyFileFormatConfig, PyStorageConfig, StorageConfig,
},
PartitionSpec,
},
daft_core::python::schema::PySchema,
daft_core::schema::SchemaRef,
Expand Down Expand Up @@ -110,6 +111,7 @@ fn tabular_scan(
file_infos: &Arc<FileInfos>,
file_format_config: &Arc<FileFormatConfig>,
storage_config: &Arc<StorageConfig>,
partition_spec: &PartitionSpec,
limit: &Option<usize>,
is_ray_runner: bool,
) -> PyResult<PyObject> {
Expand All @@ -131,7 +133,17 @@ fn tabular_scan(
*limit,
is_ray_runner,
))?;
Ok(py_iter.into())

if let Some(limit) = limit {
apply_limit(
py,
py_iter.into(),
*limit as i64,
partition_spec.num_partitions,
)
} else {
Ok(py_iter.into())
}
}

#[cfg(feature = "python")]
Expand Down Expand Up @@ -163,6 +175,23 @@ fn tabular_write(
Ok(py_iter.into())
}

#[cfg(feature = "python")]
fn apply_limit(
py: Python<'_>,
upstream_iter: PyObject,
limit: i64,
num_partitions: usize,
) -> PyResult<PyObject> {
let py_physical_plan = py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?;
let local_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "local_limit"))?
.call1((upstream_iter, limit))?;
let global_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "global_limit"))?
.call1((local_limit_iter, limit, num_partitions))?;
Ok(global_limit_iter.into())
}

#[cfg(feature = "python")]
impl PhysicalPlan {
pub fn to_partition_tasks(
Expand Down Expand Up @@ -196,6 +225,7 @@ impl PhysicalPlan {
storage_config,
..
},
partition_spec,
limit,
..
}) => tabular_scan(
Expand All @@ -205,6 +235,7 @@ impl PhysicalPlan {
file_infos,
file_format_config,
storage_config,
partition_spec,
limit,
is_ray_runner,
),
Expand All @@ -218,6 +249,7 @@ impl PhysicalPlan {
storage_config,
..
},
partition_spec,
limit,
..
}) => tabular_scan(
Expand All @@ -227,6 +259,7 @@ impl PhysicalPlan {
file_infos,
file_format_config,
storage_config,
partition_spec,
limit,
is_ray_runner,
),
Expand All @@ -240,6 +273,7 @@ impl PhysicalPlan {
storage_config,
..
},
partition_spec,
limit,
..
}) => tabular_scan(
Expand All @@ -249,6 +283,7 @@ impl PhysicalPlan {
file_infos,
file_format_config,
storage_config,
partition_spec,
limit,
is_ray_runner,
),
Expand Down Expand Up @@ -299,15 +334,7 @@ impl PhysicalPlan {
num_partitions,
}) => {
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
let py_physical_plan =
py.import(pyo3::intern!(py, "daft.execution.physical_plan"))?;
let local_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "local_limit"))?
.call1((upstream_iter, *limit))?;
let global_limit_iter = py_physical_plan
.getattr(pyo3::intern!(py, "global_limit"))?
.call1((local_limit_iter, *limit, *num_partitions))?;
Ok(global_limit_iter.into())
apply_limit(py, upstream_iter, *limit, *num_partitions)
}
PhysicalPlan::Explode(Explode { input, to_explode }) => {
let upstream_iter = input.to_partition_tasks(py, psets, is_ray_runner)?;
Expand Down

0 comments on commit 05dd97a

Please sign in to comment.