Skip to content

Commit

Permalink
[Query Planner] Add physical plan visualization option to `df.explain…
Browse files Browse the repository at this point in the history
…()`; implement `TreeVisitor` for `LogicalPlan` and `PhysicalPlan`. (#1836)

This PR adds a physical plan visualization option to `df.explain()`
(currently opt-in, off by default). In service of this, this PR also
implements `TreeVisitor` for `PhysicalPlan` and, as a drive-by, for
`LogicalPlan`. This should make expressing plan traversals much easier
in the future.

## Physical Plan Visualization Example

```python
In [1]: import daft

In [2]: df = daft.from_pydict({"a": [1, 2, 3], "b": ["a", "b", "c"], "c": [True, False, True]})

In [3]: df = df.select("a", "b")

In [4]: df = df.with_column("d", df["a"] + 1)

In [5]: df = df.where(df["d"] < 4)

In [6]: df.explain(include_physical=True)
Logical plan:

* Filter: col(d) < lit(4)
|
* Project: col(a), col(b), col(a) + lit(1) AS d
|
* Project: col(a), col(b)
|
* Source:
|   Number of partitions = 1
|   Output schema = a (Int64), b (Utf8), c (Boolean)


Physical plan:

* Filter: col(d) < lit(4)
|
* Project: col(a), col(b), col(a) + lit(1) AS d
|   Partition spec = { Scheme = Unknown, Num partitions = 1 }
|
* Project: col(a), col(b)
|   Partition spec = { Scheme = Unknown, Num partitions = 1 }
|
* InMemoryScan:
|   Schema = a (Int64), b (Utf8), c (Boolean)
|   Size bytes = 60
|   Partition spec = { Scheme = Unknown, Num partitions = 1 }
```
  • Loading branch information
clarkzinzow authored Feb 7, 2024
1 parent db7cfd1 commit f471738
Show file tree
Hide file tree
Showing 69 changed files with 1,375 additions and 292 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ class PhysicalPlanScheduler:

def num_partitions(self) -> int: ...
def partition_spec(self) -> PartitionSpec: ...
def repr_ascii(self, simple: bool) -> str: ...
def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.InProgressPhysicalPlan: ...
Expand Down
24 changes: 16 additions & 8 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,16 @@ def _result(self) -> Optional[PartitionSet]:
return self._result_cache.value

@DataframePublicAPI
def explain(self, show_optimized: bool = False, simple=False) -> None:
"""Prints the logical plan that will be executed to produce this DataFrame.
Defaults to showing the unoptimized plan. Use `show_optimized` to show the optimized one.
def explain(self, show_all: bool = False, simple: bool = False) -> None:
"""Prints the (logical and physical) plans that will be executed to produce this DataFrame.
Defaults to showing the unoptimized logical plan. Use ``show_all=True`` to show the unoptimized logical plan,
the optimized logical plan, and the physical plan.
Args:
show_optimized (bool): shows the optimized QueryPlan instead of the unoptimized one.
simple (bool): Whether to only show the type of logical op for each node in the logical plan,
rather than showing details of how each logical op is configured.
show_all (bool): Whether to show the optimized logical plan and the physical plan in addition to the
unoptimized logical plan.
simple (bool): Whether to only show the type of op for each node in the plan, rather than showing details
of how each op is configured.
"""

if self._result_cache is not None:
Expand All @@ -133,9 +135,15 @@ def explain(self, show_optimized: bool = False, simple=False) -> None:
print("However here is the logical plan used to produce this result:\n")

builder = self.__builder
if show_optimized:
builder = builder.optimize()
print("== Unoptimized Logical Plan ==\n")
print(builder.pretty_print(simple))
if show_all:
print("\n== Optimized Logical Plan ==\n")
builder = builder.optimize()
print(builder.pretty_print(simple))
print("\n== Physical Plan ==\n")
physical_plan_scheduler = builder.to_physical_plan_scheduler(get_context().daft_execution_config)
print(physical_plan_scheduler.pretty_print(simple))

def num_partitions(self) -> int:
daft_execution_config = get_context().daft_execution_config
Expand Down
9 changes: 9 additions & 0 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ def _iceberg_record_to_partition_spec(self, record: Record) -> daft.table.Table
else:
return None

def multiline_display(self) -> list[str]:
return [
self.display_name(),
f"Schema = {self._schema}",
f"Partitioning keys = {self.partitioning_keys}",
# TODO(Clark): Improve repr of storage config here.
f"Storage config = {self._storage_config}",
]

def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
limit = pushdowns.limit
iceberg_tasks = self._table.scan(limit=limit).plan_files()
Expand Down
4 changes: 4 additions & 0 deletions daft/io/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def can_absorb_limit(self) -> bool:
def can_absorb_select(self) -> bool:
raise NotImplementedError()

@abc.abstractmethod
def multiline_display(self) -> list[str]:
raise NotImplementedError()

@abc.abstractmethod
def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
raise NotImplementedError()
12 changes: 12 additions & 0 deletions daft/plan_scheduler/physical_plan_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ def num_partitions(self) -> int:
def partition_spec(self) -> PartitionSpec:
return self._scheduler.partition_spec()

def pretty_print(self, simple: bool = False) -> str:
"""
Pretty prints the current underlying physical plan.
"""
if simple:
return self._scheduler.repr_ascii(simple=True)
else:
return repr(self)

def __repr__(self) -> str:
return self._scheduler.repr_ascii(simple=False)

def to_partition_tasks(
self, psets: dict[str, list[PartitionT]], is_ray_runner: bool
) -> physical_plan.MaterializedPhysicalPlan:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ In this case, Daft is just deferring the work required to read the data and sele

When you call methods on a Daft Dataframe, it defers the work by adding to an internal "plan". You can examine the current plan of a DataFrame by calling :meth:`df.explain() <daft.DataFrame.explain>`!

Passing the ``show_optimized=True`` argument will show you the plan after Daft applies its query optimizations.
Passing the ``show_optimized=True`` argument will show you the plan after Daft applies its query optimizations, and passing ``include_physical=True`` will also show you the physical (lower-level) plan.

We can tell Daft to execute our DataFrame and cache the results using :meth:`df.collect() <daft.DataFrame.collect>`:

Expand Down
14 changes: 14 additions & 0 deletions src/common/io-config/src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ pub struct AzureConfig {
pub anonymous: bool,
}

impl AzureConfig {
pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
if let Some(storage_account) = &self.storage_account {
res.push(format!("Storage account = {}", storage_account));
}
if let Some(access_key) = &self.access_key {
res.push(format!("Access key = {}", access_key));
}
res.push(format!("Anoynmous = {}", self.anonymous));
res
}
}

impl Display for AzureConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(
Expand Down
19 changes: 19 additions & 0 deletions src/common/io-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@ pub struct IOConfig {
pub gcs: GCSConfig,
}

impl IOConfig {
pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push(format!(
"S3 config = {{ {} }}",
self.s3.multiline_display().join(", ")
));
res.push(format!(
"Azure config = {{ {} }}",
self.azure.multiline_display().join(", ")
));
res.push(format!(
"GCS config = {{ {} }}",
self.gcs.multiline_display().join(", ")
));
res
}
}

impl Display for IOConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(
Expand Down
11 changes: 11 additions & 0 deletions src/common/io-config/src/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ pub struct GCSConfig {
pub anonymous: bool,
}

impl GCSConfig {
pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
if let Some(project_id) = &self.project_id {
res.push(format!("Project ID = {}", project_id));
}
res.push(format!("Anoynmous = {}", self.anonymous));
res
}
}

impl Display for GCSConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
write!(
Expand Down
39 changes: 39 additions & 0 deletions src/common/io-config/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,45 @@ pub struct S3Config {
pub check_hostname_ssl: bool,
}

impl S3Config {
pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
if let Some(region_name) = &self.region_name {
res.push(format!("Region name = {}", region_name));
}
if let Some(endpoint_url) = &self.endpoint_url {
res.push(format!("Endpoint URL = {}", endpoint_url));
}
if let Some(key_id) = &self.key_id {
res.push(format!("Key ID = {}", key_id));
}
if let Some(session_token) = &self.session_token {
res.push(format!("Session token = {}", session_token));
}
if let Some(access_key) = &self.access_key {
res.push(format!("Access key = {}", access_key));
}
res.push(format!(
"Max connections = {}",
self.max_connections_per_io_thread
));
res.push(format!(
"Retry initial backoff ms = {}",
self.retry_initial_backoff_ms
));
res.push(format!("Connect timeout ms = {}", self.connect_timeout_ms));
res.push(format!("Read timeout ms = {}", self.read_timeout_ms));
res.push(format!("Max retries = {}", self.num_tries));
if let Some(retry_mode) = &self.retry_mode {
res.push(format!("Retry mode = {}", retry_mode));
}
res.push(format!("Anonymous = {}", self.anonymous));
res.push(format!("Verify SSL = {}", self.verify_ssl));
res.push(format!("Check hostname SSL = {}", self.check_hostname_ssl));
res
}
}

impl Default for S3Config {
fn default() -> Self {
S3Config {
Expand Down
8 changes: 8 additions & 0 deletions src/daft-core/src/datatypes/time_unit.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{Display, Formatter};

use arrow2::datatypes::TimeUnit as ArrowTimeUnit;

use serde::{Deserialize, Serialize};
Expand All @@ -21,6 +23,12 @@ impl TimeUnit {
}
}
}
impl Display for TimeUnit {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
// Leverage Debug trait implementation, which will already return the enum variant as a string.
write!(f, "{:?}", self)
}
}

impl From<&ArrowTimeUnit> for TimeUnit {
fn from(tu: &ArrowTimeUnit) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions src/daft-plan/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use daft_scan::{

#[cfg(feature = "python")]
use {
crate::{physical_plan::PhysicalPlan, source_info::InMemoryInfo},
crate::{physical_plan::PhysicalPlanRef, source_info::InMemoryInfo},
common_daft_config::PyDaftExecutionConfig,
daft_core::python::schema::PySchema,
daft_dsl::python::PyExpr,
Expand Down Expand Up @@ -507,7 +507,7 @@ impl PyLogicalPlanBuilder {
) -> PyResult<PhysicalPlanScheduler> {
py.allow_threads(|| {
let logical_plan = self.builder.build();
let physical_plan: Arc<PhysicalPlan> =
let physical_plan: PhysicalPlanRef =
plan(logical_plan.as_ref(), cfg.config.clone())?.into();
Ok(physical_plan.into())
})
Expand Down
22 changes: 22 additions & 0 deletions src/daft-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,32 @@ impl TreeDisplay for crate::LogicalPlan {
}
}

impl TreeDisplay for crate::physical_plan::PhysicalPlan {
fn get_multiline_representation(&self) -> Vec<String> {
self.multiline_display()
}

fn get_name(&self) -> String {
self.name()
}

fn get_children(&self) -> Vec<&Arc<Self>> {
self.children()
}
}

// Single node display.
impl Display for crate::LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.get_multiline_representation().join(", "))?;
Ok(())
}
}

// Single node display.
impl Display for crate::physical_plan::PhysicalPlan {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.get_multiline_representation().join(", "))?;
Ok(())
}
}
1 change: 1 addition & 0 deletions src/daft-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod sink_info;
mod source_info;
#[cfg(test)]
mod test;
mod treenode;

pub use builder::{LogicalPlanBuilder, PyLogicalPlanBuilder};
use daft_scan::{
Expand Down
13 changes: 3 additions & 10 deletions src/daft-plan/src/logical_ops/agg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use itertools::Itertools;
use snafu::ResultExt;

use daft_core::schema::{Schema, SchemaRef};
Expand Down Expand Up @@ -67,20 +68,12 @@ impl Aggregate {
let mut res = vec![];
res.push(format!(
"Aggregation: {}",
self.aggregations
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
self.aggregations.iter().map(|e| e.to_string()).join(", ")
));
if !self.groupby.is_empty() {
res.push(format!(
"Group by = {}",
self.groupby
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
self.groupby.iter().map(|e| e.to_string()).join(", ")
));
}
res.push(format!("Output schema = {}", self.schema().short_string()));
Expand Down
11 changes: 11 additions & 0 deletions src/daft-plan/src/logical_ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use daft_core::schema::{Schema, SchemaRef};
use daft_dsl::Expr;
use itertools::Itertools;
use snafu::ResultExt;

use crate::{
Expand Down Expand Up @@ -51,4 +52,14 @@ impl Explode {
exploded_schema,
})
}

pub fn multiline_display(&self) -> Vec<String> {
let mut res = vec![];
res.push(format!(
"Explode: {}",
self.to_explode.iter().map(|e| e.to_string()).join(", ")
));
res.push(format!("Schema = {}", self.exploded_schema.short_string()));
res
}
}
19 changes: 4 additions & 15 deletions src/daft-plan/src/logical_ops/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use daft_core::{
DataType,
};
use daft_dsl::Expr;
use itertools::Itertools;
use snafu::ResultExt;

use crate::{
Expand Down Expand Up @@ -121,31 +122,19 @@ impl Join {
if !self.left_on.is_empty() && !self.right_on.is_empty() && self.left_on == self.right_on {
res.push(format!(
"On = {}",
self.left_on
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
self.left_on.iter().map(|e| e.to_string()).join(", ")
));
} else {
if !self.left_on.is_empty() {
res.push(format!(
"Left on = {}",
self.left_on
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
self.left_on.iter().map(|e| e.to_string()).join(", ")
));
}
if !self.right_on.is_empty() {
res.push(format!(
"Right on = {}",
self.right_on
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ")
self.right_on.iter().map(|e| e.to_string()).join(", ")
));
}
}
Expand Down
Loading

0 comments on commit f471738

Please sign in to comment.