diff --git a/Cargo.lock b/Cargo.lock index 0dc52e53bd..8d7b1b8401 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2132,6 +2132,7 @@ dependencies = [ "pyo3", "rstest", "serde", + "serde_json", ] [[package]] diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 798632d51f..ccb8908420 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1626,6 +1626,7 @@ class PhysicalPlanScheduler: def num_partitions(self) -> int: ... def repr_ascii(self, simple: bool) -> str: ... def repr_mermaid(self, options: MermaidOptions) -> str: ... + def to_json_string(self) -> str: ... def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ... def run(self, psets: dict[str, list[PartitionT]]) -> Iterator[PyMicroPartition]: ... diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 7f8ed96cf2..b69dc011d2 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -62,6 +62,9 @@ def to_physical_plan_scheduler(self, daft_execution_config: PyDaftExecutionConfi used to generate executable tasks for the physical plan. This should be called after triggering optimization with self.optimize(). + + **Warning**: This function is not part of the stable API and may change + without notice. It is intended for internal or experimental use only. """ from daft.plan_scheduler.physical_plan_scheduler import PhysicalPlanScheduler diff --git a/daft/plan_scheduler/physical_plan_scheduler.py b/daft/plan_scheduler/physical_plan_scheduler.py index 43bab81dbe..d384683c48 100644 --- a/daft/plan_scheduler/physical_plan_scheduler.py +++ b/daft/plan_scheduler/physical_plan_scheduler.py @@ -53,6 +53,9 @@ def pretty_print(self, simple: bool = False, format: str = "ascii") -> str: def __repr__(self) -> str: return self._scheduler.repr_ascii(simple=False) + def to_json_string(self) -> str: + return self._scheduler.to_json_string() + def to_partition_tasks( self, psets: dict[str, list[PartitionT]], results_buffer_size: int | None ) -> physical_plan.MaterializedPhysicalPlan: diff --git a/src/daft-scheduler/Cargo.toml b/src/daft-scheduler/Cargo.toml index 14228f79b9..2911eaa38b 100644 --- a/src/daft-scheduler/Cargo.toml +++ b/src/daft-scheduler/Cargo.toml @@ -11,6 +11,7 @@ daft-plan = {path = "../daft-plan", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} pyo3 = {workspace = true, optional = true} serde = {workspace = true, features = ["rc"]} +serde_json = {workspace = true} [dev-dependencies] rstest = {workspace = true} diff --git a/src/daft-scheduler/src/scheduler.rs b/src/daft-scheduler/src/scheduler.rs index eeedc4471a..d65dc18a9f 100644 --- a/src/daft-scheduler/src/scheduler.rs +++ b/src/daft-scheduler/src/scheduler.rs @@ -80,6 +80,10 @@ impl PhysicalPlanScheduler { Ok(self.plan().repr_mermaid(options)) } + pub fn to_json_string(&self) -> PyResult { + serde_json::to_string(&self.plan()) + .map_err(|e| pyo3::PyErr::new::(e.to_string())) + } /// Converts the contained physical plan into an iterator of executable partition tasks. pub fn to_partition_tasks( &self,