From e93dd94df2f34373107b621d8019afd656b255ad Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 15 Oct 2024 13:37:28 -0700 Subject: [PATCH] [FEAT] Enable to_json_string for physical plan (#3023) https://github.com/Eventual-Inc/Daft/issues/2977 Allows physical plan to be serialized to be json strings. The script below shows how you can extract the sql queries from read sql. ``` import daft import json def create_conn(): return create_engine('sqlite:///test_database.db').connect() df = daft.read_sql("SELECT * FROM test_data", create_conn, partition_col="id", num_partitions=3) physical_plan_scheduler = df._builder.to_physical_plan_scheduler( daft.context.get_context().daft_execution_config ) physical_plan_dict = json.loads(physical_plan_scheduler.to_json_string()) # collect all sql queries sql_queries = [] for task in physical_plan_dict['TabularScan']['scan_tasks']: sql_queries.append(task['file_format_config']['Database']['sql']) print(sql_queries) ['SELECT * FROM (SELECT * FROM test_data) AS subquery WHERE id >= 1 AND id < 34', 'SELECT * FROM (SELECT * FROM test_data) AS subquery WHERE id >= 34 AND id < 67', 'SELECT * FROM (SELECT * FROM test_data) AS subquery WHERE id >= 67 AND id <= 100'] ``` --------- Co-authored-by: Colin Ho --- Cargo.lock | 1 + daft/daft/__init__.pyi | 1 + daft/logical/builder.py | 3 +++ daft/plan_scheduler/physical_plan_scheduler.py | 3 +++ src/daft-scheduler/Cargo.toml | 1 + src/daft-scheduler/src/scheduler.rs | 4 ++++ 6 files changed, 13 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0dc52e53bd..8d7b1b8401 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2132,6 +2132,7 @@ dependencies = [ "pyo3", "rstest", "serde", + "serde_json", ] [[package]] diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 798632d51f..ccb8908420 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1626,6 +1626,7 @@ class PhysicalPlanScheduler: def num_partitions(self) -> int: ... def repr_ascii(self, simple: bool) -> str: ... def repr_mermaid(self, options: MermaidOptions) -> str: ... + def to_json_string(self) -> str: ... def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ... def run(self, psets: dict[str, list[PartitionT]]) -> Iterator[PyMicroPartition]: ... diff --git a/daft/logical/builder.py b/daft/logical/builder.py index 7f8ed96cf2..b69dc011d2 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -62,6 +62,9 @@ def to_physical_plan_scheduler(self, daft_execution_config: PyDaftExecutionConfi used to generate executable tasks for the physical plan. This should be called after triggering optimization with self.optimize(). + + **Warning**: This function is not part of the stable API and may change + without notice. It is intended for internal or experimental use only. """ from daft.plan_scheduler.physical_plan_scheduler import PhysicalPlanScheduler diff --git a/daft/plan_scheduler/physical_plan_scheduler.py b/daft/plan_scheduler/physical_plan_scheduler.py index 43bab81dbe..d384683c48 100644 --- a/daft/plan_scheduler/physical_plan_scheduler.py +++ b/daft/plan_scheduler/physical_plan_scheduler.py @@ -53,6 +53,9 @@ def pretty_print(self, simple: bool = False, format: str = "ascii") -> str: def __repr__(self) -> str: return self._scheduler.repr_ascii(simple=False) + def to_json_string(self) -> str: + return self._scheduler.to_json_string() + def to_partition_tasks( self, psets: dict[str, list[PartitionT]], results_buffer_size: int | None ) -> physical_plan.MaterializedPhysicalPlan: diff --git a/src/daft-scheduler/Cargo.toml b/src/daft-scheduler/Cargo.toml index 14228f79b9..2911eaa38b 100644 --- a/src/daft-scheduler/Cargo.toml +++ b/src/daft-scheduler/Cargo.toml @@ -11,6 +11,7 @@ daft-plan = {path = "../daft-plan", default-features = false} daft-scan = {path = "../daft-scan", default-features = false} pyo3 = {workspace = true, optional = true} serde = {workspace = true, features = ["rc"]} +serde_json = {workspace = true} [dev-dependencies] rstest = {workspace = true} diff --git a/src/daft-scheduler/src/scheduler.rs b/src/daft-scheduler/src/scheduler.rs index eeedc4471a..d65dc18a9f 100644 --- a/src/daft-scheduler/src/scheduler.rs +++ b/src/daft-scheduler/src/scheduler.rs @@ -80,6 +80,10 @@ impl PhysicalPlanScheduler { Ok(self.plan().repr_mermaid(options)) } + pub fn to_json_string(&self) -> PyResult { + serde_json::to_string(&self.plan()) + .map_err(|e| pyo3::PyErr::new::(e.to_string())) + } /// Converts the contained physical plan into an iterator of executable partition tasks. pub fn to_partition_tasks( &self,