Skip to content

Commit

Permalink
[FEAT] Enable to_json_string for physical plan (#3023)
Browse files Browse the repository at this point in the history
#2977

Allows physical plan to be serialized to be json strings. The script
below shows how you can extract the sql queries from read sql.

```
import daft
import json

def create_conn():
    return create_engine('sqlite:///test_database.db').connect()

df = daft.read_sql("SELECT * FROM test_data", create_conn, partition_col="id", num_partitions=3)
physical_plan_scheduler = df._builder.to_physical_plan_scheduler(
    daft.context.get_context().daft_execution_config
)

physical_plan_dict = json.loads(physical_plan_scheduler.to_json_string())

# collect all sql queries
sql_queries = []
for task in physical_plan_dict['TabularScan']['scan_tasks']:
    sql_queries.append(task['file_format_config']['Database']['sql'])
print(sql_queries)

['SELECT * FROM (SELECT * FROM test_data) AS subquery WHERE id >= 1 AND id < 34', 'SELECT * FROM (SELECT * FROM test_data) AS subquery WHERE id >= 34 AND id < 67', 'SELECT * FROM (SELECT * FROM test_data) AS subquery WHERE id >= 67 AND id <= 100']
```

---------

Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Oct 15, 2024
1 parent ba0b871 commit e93dd94
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,7 @@ class PhysicalPlanScheduler:
def num_partitions(self) -> int: ...
def repr_ascii(self, simple: bool) -> str: ...
def repr_mermaid(self, options: MermaidOptions) -> str: ...
def to_json_string(self) -> str: ...
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.InProgressPhysicalPlan: ...
def run(self, psets: dict[str, list[PartitionT]]) -> Iterator[PyMicroPartition]: ...

Expand Down
3 changes: 3 additions & 0 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def to_physical_plan_scheduler(self, daft_execution_config: PyDaftExecutionConfi
used to generate executable tasks for the physical plan.
This should be called after triggering optimization with self.optimize().
**Warning**: This function is not part of the stable API and may change
without notice. It is intended for internal or experimental use only.
"""
from daft.plan_scheduler.physical_plan_scheduler import PhysicalPlanScheduler

Expand Down
3 changes: 3 additions & 0 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def pretty_print(self, simple: bool = False, format: str = "ascii") -> str:
def __repr__(self) -> str:
return self._scheduler.repr_ascii(simple=False)

def to_json_string(self) -> str:
return self._scheduler.to_json_string()

def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], results_buffer_size: int | None
) -> physical_plan.MaterializedPhysicalPlan:
Expand Down
1 change: 1 addition & 0 deletions src/daft-scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ daft-plan = {path = "../daft-plan", default-features = false}
daft-scan = {path = "../daft-scan", default-features = false}
pyo3 = {workspace = true, optional = true}
serde = {workspace = true, features = ["rc"]}
serde_json = {workspace = true}

[dev-dependencies]
rstest = {workspace = true}
Expand Down
4 changes: 4 additions & 0 deletions src/daft-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ impl PhysicalPlanScheduler {
Ok(self.plan().repr_mermaid(options))
}

pub fn to_json_string(&self) -> PyResult<String> {
serde_json::to_string(&self.plan())
.map_err(|e| pyo3::PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))
}
/// Converts the contained physical plan into an iterator of executable partition tasks.
pub fn to_partition_tasks(
&self,
Expand Down

0 comments on commit e93dd94

Please sign in to comment.