Skip to content

Commit

Permalink
[FEAT] Example Analyze for Local Execution Engine (#2648)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 authored Aug 16, 2024
1 parent d5f7a26 commit b15b0c7
Show file tree
Hide file tree
Showing 21 changed files with 597 additions and 38 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/display/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use mermaid::MermaidDisplayOptions;
pub mod ascii;
pub mod mermaid;
pub mod tree;
pub mod utils;

pub trait DisplayAs {
fn display_as(&self, level: DisplayLevel) -> String;
Expand Down
27 changes: 23 additions & 4 deletions src/common/display/src/mermaid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct MermaidDisplayOptions {
/// This is useful for large trees.
/// In simple mode, the display string is just the node's name.
pub simple: bool,
/// Display the root node at the bottom of the diagram or at the top
pub bottom_up: bool,
/// subgraph_options is used to configure the subgraph.
/// Since some common displays (jupyter) don't support multiple mermaid graphs in a single cell, we need to use subgraphs.
/// The subgraph_options is used to both indicate that a subgraph should be used, and to configure the subgraph.
Expand All @@ -38,8 +40,12 @@ impl<T: TreeDisplay> MermaidDisplay for T {
false => DisplayLevel::Default,
};

let mut visitor =
MermaidDisplayVisitor::new(&mut s, display_type, options.subgraph_options);
let mut visitor = MermaidDisplayVisitor::new(
&mut s,
display_type,
options.bottom_up,
options.subgraph_options,
);

let _ = visitor.fmt(self);
s
Expand All @@ -49,6 +55,8 @@ impl<T: TreeDisplay> MermaidDisplay for T {
pub struct MermaidDisplayVisitor<'a, W> {
output: &'a mut W,
t: DisplayLevel,
/// Should the root node be at the bottom or the top of the diagram
bottom_up: bool,
/// each node should only appear once in the tree.
/// the key is the node's `multiline_display` string, and the value is the node's id.
/// This is necessary because the same kind of node can appear multiple times in the tree. (such as multiple filters)
Expand All @@ -59,10 +67,16 @@ pub struct MermaidDisplayVisitor<'a, W> {
}

impl<'a, W> MermaidDisplayVisitor<'a, W> {
pub fn new(w: &'a mut W, t: DisplayLevel, subgraph_options: Option<SubgraphOptions>) -> Self {
pub fn new(
w: &'a mut W,
t: DisplayLevel,
bottom_up: bool,
subgraph_options: Option<SubgraphOptions>,
) -> Self {
Self {
output: w,
t,
bottom_up,
nodes: IndexMap::new(),
node_count: 0,
subgraph_options,
Expand Down Expand Up @@ -138,7 +152,12 @@ where
writeln!(self.output, "end")?;
}
None => {
writeln!(self.output, "flowchart TD")?;
if self.bottom_up {
writeln!(self.output, "flowchart BT")?;
} else {
writeln!(self.output, "flowchart TD")?;
}

self.fmt_node(node)?;
}
}
Expand Down
18 changes: 18 additions & 0 deletions src/common/display/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
pub fn bytes_to_human_readable(byte_count: usize) -> String {
if byte_count == 0 {
return "0 B".to_string();
}

const UNITS: &[&str] = &["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"];
let base = byte_count.ilog2() / 10; // log2(1024) = 10

let index = std::cmp::min(base, (UNITS.len() - 1) as u32);
let basis = 1usize << (10 * index);
let scaled_value = (byte_count as f64) / (basis as f64);
let unit = UNITS.get(index as usize).unwrap();
if index == 0 {
format!("{byte_count} {unit}")
} else {
format!("{scaled_value:.2} {unit}")
}
}
4 changes: 2 additions & 2 deletions src/daft-io/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl IOStatsContext {
}

#[inline]
pub(crate) fn mark_get_requests(&self, num_requests: usize) {
pub fn mark_get_requests(&self, num_requests: usize) {
self.num_get_requests
.fetch_add(num_requests, atomic::Ordering::Relaxed);
}
Expand Down Expand Up @@ -105,7 +105,7 @@ impl IOStatsContext {
}

#[inline]
pub(crate) fn mark_bytes_read(&self, bytes_read: usize) {
pub fn mark_bytes_read(&self, bytes_read: usize) {
self.bytes_read
.fetch_add(bytes_read, atomic::Ordering::Relaxed);
}
Expand Down
4 changes: 3 additions & 1 deletion src/daft-local-execution/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[dependencies]
async-stream = {workspace = true}
async-trait = {workspace = true}
common-display = {path = "../common/display", default-features = false}
common-error = {path = "../common/error", default-features = false}
common-tracing = {path = "../common/tracing", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
Expand All @@ -17,13 +18,14 @@ daft-stats = {path = "../daft-stats", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
lazy_static = {workspace = true}
num-format = "0.4.4"
pyo3 = {workspace = true, optional = true}
snafu = {workspace = true}
tokio = {workspace = true}
tracing = {workspace = true}

[features]
python = ["dep:pyo3", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python"]
python = ["dep:pyo3", "common-error/python", "daft-dsl/python", "daft-io/python", "daft-micropartition/python", "daft-plan/python", "daft-scan/python", "common-display/python"]

[package]
edition = {workspace = true}
Expand Down
51 changes: 47 additions & 4 deletions src/daft-local-execution/src/intermediate_ops/intermediate_op.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use common_display::tree::TreeDisplay;
use common_error::DaftResult;
use daft_micropartition::MicroPartition;
use tracing::{info_span, instrument};
Expand All @@ -12,30 +13,40 @@ use crate::{
SingleSender,
},
pipeline::PipelineNode,
runtime_stats::{CountingSender, RuntimeStatsContext},
ExecutionRuntimeHandle, NUM_CPUS,
};

use super::state::OperatorTaskState;
pub trait IntermediateOperator: Send + Sync {
fn execute(&self, input: &Arc<MicroPartition>) -> DaftResult<Arc<MicroPartition>>;
#[allow(dead_code)]

fn name(&self) -> &'static str;
}

pub(crate) struct IntermediateNode {
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
runtime_stats: Arc<RuntimeStatsContext>,
}

impl IntermediateNode {
pub(crate) fn new(
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
) -> Self {
IntermediateNode {
let rts = RuntimeStatsContext::new();
Self::new_with_runtime_stats(intermediate_op, children, rts)
}

pub(crate) fn new_with_runtime_stats(
intermediate_op: Arc<dyn IntermediateOperator>,
children: Vec<Box<dyn PipelineNode>>,
runtime_stats: Arc<RuntimeStatsContext>,
) -> Self {
Self {
intermediate_op,
children,
runtime_stats,
}
}

Expand All @@ -48,11 +59,14 @@ impl IntermediateNode {
op: Arc<dyn IntermediateOperator>,
mut receiver: SingleReceiver,
sender: SingleSender,
rt_context: Arc<RuntimeStatsContext>,
) -> DaftResult<()> {
let mut state = OperatorTaskState::new();
let span = info_span!("IntermediateOp::execute");
let sender = CountingSender::new(sender, rt_context.clone());
while let Some(morsel) = receiver.recv().await {
let result = span.in_scope(|| op.execute(&morsel))?;
rt_context.mark_rows_received(morsel.len() as u64);
let result = rt_context.in_span(&span, || op.execute(&morsel))?;
state.add(result);
if let Some(part) = state.try_clear() {
let _ = sender.send(part?).await;
Expand All @@ -78,6 +92,7 @@ impl IntermediateNode {
self.intermediate_op.clone(),
worker_receiver,
destination_sender,
self.runtime_stats.clone(),
));
worker_senders.push(worker_sender);
}
Expand All @@ -102,12 +117,37 @@ impl IntermediateNode {
}
}

impl TreeDisplay for IntermediateNode {
fn display_as(&self, level: common_display::DisplayLevel) -> String {
use std::fmt::Write;
let mut display = String::new();
writeln!(display, "{}", self.intermediate_op.name()).unwrap();
use common_display::DisplayLevel::*;
match level {
Compact => {}
_ => {
let rt_result = self.runtime_stats.result();
rt_result.display(&mut display, true, true, true).unwrap();
}
}
display
}

fn get_children(&self) -> Vec<&dyn TreeDisplay> {
self.children.iter().map(|v| v.as_tree_display()).collect()
}
}

#[async_trait]
impl PipelineNode for IntermediateNode {
fn children(&self) -> Vec<&dyn PipelineNode> {
self.children.iter().map(|v| v.as_ref()).collect()
}

fn name(&self) -> &'static str {
self.intermediate_op.name()
}

async fn start(
&mut self,
mut destination: MultiSender,
Expand All @@ -129,4 +169,7 @@ impl PipelineNode for IntermediateNode {
runtime_handle.spawn(Self::send_to_workers(receiver, worker_senders));
Ok(())
}
fn as_tree_display(&self) -> &dyn TreeDisplay {
self
}
}
2 changes: 1 addition & 1 deletion src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ mod channel;
mod intermediate_ops;
mod pipeline;
mod run;
mod runtime_stats;
mod sinks;
mod sources;

use common_error::{DaftError, DaftResult};
use lazy_static::lazy_static;
pub use run::NativeExecutor;
Expand Down
18 changes: 17 additions & 1 deletion src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
};

use async_trait::async_trait;
use common_display::{mermaid::MermaidDisplayVisitor, tree::TreeDisplay};
use common_error::DaftResult;
use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
Expand All @@ -30,13 +31,28 @@ use daft_plan::populate_aggregation_stages;
use crate::channel::MultiSender;

#[async_trait]
pub trait PipelineNode: Sync + Send {
pub trait PipelineNode: Sync + Send + TreeDisplay {
fn children(&self) -> Vec<&dyn PipelineNode>;
fn name(&self) -> &'static str;
async fn start(
&mut self,
destination: MultiSender,
runtime_handle: &mut ExecutionRuntimeHandle,
) -> DaftResult<()>;

fn as_tree_display(&self) -> &dyn TreeDisplay;
}

pub(crate) fn viz_pipeline(root: &dyn PipelineNode) -> String {
let mut output = String::new();
let mut visitor = MermaidDisplayVisitor::new(
&mut output,
common_display::DisplayLevel::Default,
true,
Default::default(),
);
visitor.fmt(root.as_tree_display()).unwrap();
output
}

pub fn physical_plan_to_pipeline(
Expand Down
Loading

0 comments on commit b15b0c7

Please sign in to comment.