diff --git a/src/daft-io/src/stats.rs b/src/daft-io/src/stats.rs index e672c8ba9e..cc626f10f3 100644 --- a/src/daft-io/src/stats.rs +++ b/src/daft-io/src/stats.rs @@ -1,13 +1,16 @@ -use std::sync::{ - atomic::{self}, - Arc, +use std::{ + borrow::Cow, + sync::{ + atomic::{self}, + Arc, + }, }; pub type IOStatsRef = Arc; #[derive(Default, Debug)] pub struct IOStatsContext { - name: String, + name: Cow<'static, str>, num_get_requests: atomic::AtomicUsize, num_head_requests: atomic::AtomicUsize, num_list_requests: atomic::AtomicUsize, @@ -38,9 +41,9 @@ pub(crate) struct IOStatsByteStreamContextHandle { } impl IOStatsContext { - pub fn new(name: String) -> IOStatsRef { + pub fn new>>(name: S) -> IOStatsRef { Arc::new(IOStatsContext { - name, + name: name.into(), num_get_requests: atomic::AtomicUsize::new(0), num_head_requests: atomic::AtomicUsize::new(0), num_list_requests: atomic::AtomicUsize::new(0), diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index a786315fd0..b53e769239 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -303,10 +303,7 @@ impl MicroPartition { } } - pub fn from_scan_task( - scan_task: Arc, - io_stats: Option, - ) -> crate::Result { + pub fn from_scan_task(scan_task: Arc, io_stats: IOStatsRef) -> crate::Result { let schema = scan_task.schema.clone(); match ( &scan_task.metadata, @@ -355,7 +352,7 @@ impl MicroPartition { .clone() .map(|c| Arc::new(c.clone())) .unwrap_or_default(), - io_stats, + Some(io_stats), if scan_task.sources.len() == 1 { 1 } else { 128 }, // Hardcoded for to 128 bulk reads cfg.multithreaded_io, &ParquetSchemaInferenceOptions { @@ -369,7 +366,7 @@ impl MicroPartition { // Perform an eager **data** read _ => { let statistics = scan_task.statistics.clone(); - let (tables, schema) = materialize_scan_task(scan_task, None, io_stats)?; + let (tables, schema) = materialize_scan_task(scan_task, None, Some(io_stats))?; Ok(Self::new_loaded(schema, Arc::new(tables), statistics)) } } @@ -411,15 +408,15 @@ impl MicroPartition { Ok(size_bytes) } - pub(crate) fn tables_or_read( - &self, - io_stats: Option, - ) -> crate::Result>> { + pub(crate) fn tables_or_read(&self, io_stats: IOStatsRef) -> crate::Result>> { let mut guard = self.state.lock().unwrap(); match guard.deref() { TableState::Unloaded(scan_task) => { - let (tables, _) = - materialize_scan_task(scan_task.clone(), Some(self.schema.clone()), io_stats)?; + let (tables, _) = materialize_scan_task( + scan_task.clone(), + Some(self.schema.clone()), + Some(io_stats), + )?; let table_values = Arc::new(tables); // Cache future accesses by setting the state to TableState::Loaded @@ -431,8 +428,8 @@ impl MicroPartition { } } - pub(crate) fn concat_or_get(&self) -> crate::Result>> { - let tables = self.tables_or_read(None)?; + pub(crate) fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result>> { + let tables = self.tables_or_read(io_stats)?; if tables.len() <= 1 { return Ok(tables); } diff --git a/src/daft-micropartition/src/ops/agg.rs b/src/daft-micropartition/src/ops/agg.rs index 695a709e13..618d7a13e5 100644 --- a/src/daft-micropartition/src/ops/agg.rs +++ b/src/daft-micropartition/src/ops/agg.rs @@ -1,12 +1,15 @@ use common_error::DaftResult; use daft_dsl::Expr; +use daft_io::IOStatsContext; use daft_table::Table; use crate::micropartition::MicroPartition; impl MicroPartition { pub fn agg(&self, to_agg: &[Expr], group_by: &[Expr]) -> DaftResult { - let tables = self.concat_or_get()?; + let io_stats = IOStatsContext::new("MicroPartition::agg"); + + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => { diff --git a/src/daft-micropartition/src/ops/concat.rs b/src/daft-micropartition/src/ops/concat.rs index 9901a02d35..8e23360d36 100644 --- a/src/daft-micropartition/src/ops/concat.rs +++ b/src/daft-micropartition/src/ops/concat.rs @@ -1,6 +1,7 @@ use std::sync::Mutex; use common_error::{DaftError, DaftResult}; +use daft_io::IOStatsContext; use crate::micropartition::{MicroPartition, TableState}; @@ -26,10 +27,12 @@ impl MicroPartition { } } + let io_stats = IOStatsContext::new("MicroPartition::concat"); + let mut all_tables = vec![]; for m in mps.iter() { - let tables = m.tables_or_read(None)?; + let tables = m.tables_or_read(io_stats.clone())?; all_tables.extend_from_slice(tables.as_slice()); } let mut all_stats = None; diff --git a/src/daft-micropartition/src/ops/eval_expressions.rs b/src/daft-micropartition/src/ops/eval_expressions.rs index d193707289..0704d9895b 100644 --- a/src/daft-micropartition/src/ops/eval_expressions.rs +++ b/src/daft-micropartition/src/ops/eval_expressions.rs @@ -3,6 +3,7 @@ use std::{collections::HashSet, sync::Arc}; use common_error::{DaftError, DaftResult}; use daft_core::schema::Schema; use daft_dsl::Expr; +use daft_io::IOStatsContext; use snafu::ResultExt; use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu}; @@ -30,8 +31,10 @@ fn infer_schema(exprs: &[Expr], schema: &Schema) -> DaftResult { impl MicroPartition { pub fn eval_expression_list(&self, exprs: &[Expr]) -> DaftResult { + let io_stats = IOStatsContext::new("MicroPartition::eval_expression_list"); + let expected_schema = infer_schema(exprs, &self.schema)?; - let tables = self.tables_or_read(None)?; + let tables = self.tables_or_read(io_stats)?; let evaluated_tables = tables .iter() .map(|t| t.eval_expression_list(exprs)) @@ -51,7 +54,9 @@ impl MicroPartition { } pub fn explode(&self, exprs: &[Expr]) -> DaftResult { - let tables = self.tables_or_read(None)?; + let io_stats = IOStatsContext::new("MicroPartition::explode"); + + let tables = self.tables_or_read(io_stats)?; let evaluated_tables = tables .iter() .map(|t| t.explode(exprs)) diff --git a/src/daft-micropartition/src/ops/filter.rs b/src/daft-micropartition/src/ops/filter.rs index 85e5d705f3..b49df69c95 100644 --- a/src/daft-micropartition/src/ops/filter.rs +++ b/src/daft-micropartition/src/ops/filter.rs @@ -1,5 +1,6 @@ use common_error::DaftResult; use daft_dsl::Expr; +use daft_io::IOStatsContext; use snafu::ResultExt; use crate::{micropartition::MicroPartition, DaftCoreComputeSnafu}; @@ -8,6 +9,7 @@ use daft_stats::TruthValue; impl MicroPartition { pub fn filter(&self, predicate: &[Expr]) -> DaftResult { + let io_stats = IOStatsContext::new("MicroPartition::filter"); if predicate.is_empty() { return Ok(Self::empty(Some(self.schema.clone()))); } @@ -26,7 +28,7 @@ impl MicroPartition { } // TODO figure out defered IOStats let tables = self - .tables_or_read(None)? + .tables_or_read(io_stats)? .iter() .map(|t| t.filter(predicate)) .collect::>>() diff --git a/src/daft-micropartition/src/ops/join.rs b/src/daft-micropartition/src/ops/join.rs index 71b6a5d2cf..235c9c44ae 100644 --- a/src/daft-micropartition/src/ops/join.rs +++ b/src/daft-micropartition/src/ops/join.rs @@ -1,6 +1,7 @@ use common_error::DaftResult; use daft_core::array::ops::DaftCompare; use daft_dsl::Expr; +use daft_io::IOStatsContext; use daft_table::infer_join_schema; use crate::micropartition::MicroPartition; @@ -33,9 +34,10 @@ impl MicroPartition { if let TruthValue::False = tv { return Ok(Self::empty(Some(join_schema.into()))); } + let io_stats = IOStatsContext::new("MicroPartition::join"); - let lt = self.concat_or_get()?; - let rt = right.concat_or_get()?; + let lt = self.concat_or_get(io_stats.clone())?; + let rt = right.concat_or_get(io_stats)?; match (lt.as_slice(), rt.as_slice()) { ([], _) | (_, []) => Ok(Self::empty(Some(join_schema.into()))), diff --git a/src/daft-micropartition/src/ops/partition.rs b/src/daft-micropartition/src/ops/partition.rs index 4b4af2150e..b7899fe3f9 100644 --- a/src/daft-micropartition/src/ops/partition.rs +++ b/src/daft-micropartition/src/ops/partition.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use common_error::DaftResult; use daft_dsl::Expr; +use daft_io::IOStatsContext; use daft_table::Table; use crate::micropartition::MicroPartition; @@ -45,7 +46,9 @@ impl MicroPartition { exprs: &[Expr], num_partitions: usize, ) -> DaftResult> { - let tables = self.tables_or_read(None)?; + let io_stats = IOStatsContext::new("MicroPartition::partition_by_hash"); + + let tables = self.tables_or_read(io_stats)?; if tables.is_empty() { return Ok( @@ -63,7 +66,9 @@ impl MicroPartition { } pub fn partition_by_random(&self, num_partitions: usize, seed: u64) -> DaftResult> { - let tables = self.tables_or_read(None)?; + let io_stats = IOStatsContext::new("MicroPartition::partition_by_random"); + + let tables = self.tables_or_read(io_stats)?; if tables.is_empty() { return Ok( @@ -87,7 +92,9 @@ impl MicroPartition { boundaries: &Table, descending: &[bool], ) -> DaftResult> { - let tables = self.tables_or_read(None)?; + let io_stats = IOStatsContext::new("MicroPartition::partition_by_range"); + + let tables = self.tables_or_read(io_stats)?; if tables.is_empty() { let num_partitions = boundaries.len() + 1; diff --git a/src/daft-micropartition/src/ops/slice.rs b/src/daft-micropartition/src/ops/slice.rs index 37587fd919..b16d5e7846 100644 --- a/src/daft-micropartition/src/ops/slice.rs +++ b/src/daft-micropartition/src/ops/slice.rs @@ -1,10 +1,13 @@ use common_error::DaftResult; +use daft_io::IOStatsContext; use crate::micropartition::MicroPartition; impl MicroPartition { pub fn slice(&self, start: usize, end: usize) -> DaftResult { - let tables = self.tables_or_read(None)?; + let io_stats = IOStatsContext::new(format!("MicroPartition::slice {start}-{end}")); + + let tables = self.tables_or_read(io_stats)?; let mut slices_tables = vec![]; let mut rows_needed = (end - start).max(0); let mut offset_so_far = start; diff --git a/src/daft-micropartition/src/ops/sort.rs b/src/daft-micropartition/src/ops/sort.rs index fe0d7921ac..62858f6c1f 100644 --- a/src/daft-micropartition/src/ops/sort.rs +++ b/src/daft-micropartition/src/ops/sort.rs @@ -3,13 +3,16 @@ use std::sync::Arc; use common_error::DaftResult; use daft_core::Series; use daft_dsl::Expr; +use daft_io::IOStatsContext; use daft_table::Table; use crate::micropartition::MicroPartition; impl MicroPartition { pub fn sort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult { - let tables = self.concat_or_get()?; + let io_stats = IOStatsContext::new("MicroPartition::sort"); + + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => Ok(Self::empty(Some(self.schema.clone()))), [single] => { @@ -25,7 +28,9 @@ impl MicroPartition { } pub fn argsort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult { - let tables = self.concat_or_get()?; + let io_stats = IOStatsContext::new("MicroPartition::argsort"); + + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => { let empty_table = Table::empty(Some(self.schema.clone()))?; diff --git a/src/daft-micropartition/src/ops/take.rs b/src/daft-micropartition/src/ops/take.rs index 1c2d986673..bc412b42c7 100644 --- a/src/daft-micropartition/src/ops/take.rs +++ b/src/daft-micropartition/src/ops/take.rs @@ -2,13 +2,16 @@ use std::sync::Arc; use common_error::DaftResult; use daft_core::Series; +use daft_io::IOStatsContext; use daft_table::Table; use crate::micropartition::MicroPartition; impl MicroPartition { pub fn take(&self, idx: &Series) -> DaftResult { - let tables = self.concat_or_get()?; + let io_stats = IOStatsContext::new("MicroPartition::take"); + + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { // Fallback onto `[empty_table]` behavior [] => { @@ -33,7 +36,9 @@ impl MicroPartition { } pub fn sample(&self, num: usize) -> DaftResult { - let tables = self.concat_or_get()?; + let io_stats = IOStatsContext::new(format!("MicroPartition::sample({num})")); + + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => Ok(Self::empty(Some(self.schema.clone()))), @@ -50,7 +55,9 @@ impl MicroPartition { } pub fn quantiles(&self, num: usize) -> DaftResult { - let tables = self.concat_or_get()?; + let io_stats = IOStatsContext::new(format!("MicroPartition::quantiles({num})")); + + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => Ok(Self::empty(Some(self.schema.clone()))), [single] => { diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 780fb481a7..f4545466f5 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -41,7 +41,10 @@ impl PyMicroPartition { } pub fn get_column(&self, name: &str, py: Python) -> PyResult { - let tables = py.allow_threads(|| self.inner.concat_or_get())?; + let tables = py.allow_threads(|| { + let io_stats = IOStatsContext::new(format!("PyMicroPartition::get_column: {name}")); + self.inner.concat_or_get(io_stats) + })?; let columns = tables .iter() .map(|t| t.get_column(name)) @@ -77,7 +80,7 @@ impl PyMicroPartition { "MicroPartition::from_scan_task for {:?}", scan_task.0.sources )); - MicroPartition::from_scan_task(scan_task.into(), Some(io_stats)) + MicroPartition::from_scan_task(scan_task.into(), io_stats) })? .into()) } @@ -125,7 +128,10 @@ impl PyMicroPartition { // Export Methods pub fn to_table(&self, py: Python) -> PyResult { - let concatted = py.allow_threads(|| self.inner.concat_or_get())?; + let concatted = py.allow_threads(|| { + let io_stats = IOStatsContext::new("PyMicroPartition::to_table"); + self.inner.concat_or_get(io_stats) + })?; match &concatted.as_ref()[..] { [] => PyTable::empty(Some(self.schema()?)), [table] => Ok(PyTable { diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 2b2f04cc62..73f202feb0 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -133,7 +133,7 @@ pub mod pylib { coerce_int96_timestamp_unit: Option, ) -> PyResult> { py.allow_threads(|| { - let io_stats = IOStatsContext::new("read_parquet_bulk".to_string()); + let io_stats = IOStatsContext::new("read_parquet_bulk"); let io_client = get_io_client( multithreaded_io.unwrap_or(true), @@ -243,7 +243,7 @@ pub mod pylib { multithreaded_io: Option, ) -> PyResult { py.allow_threads(|| { - let io_stats = IOStatsContext::new("read_parquet_statistics".to_string()); + let io_stats = IOStatsContext::new("read_parquet_statistics"); let io_client = get_io_client( multithreaded_io.unwrap_or(true),