diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 866ea25da7..ede1aa42fd 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -411,15 +411,15 @@ impl MicroPartition { Ok(size_bytes) } - pub(crate) fn tables_or_read( - &self, - io_stats: Option, - ) -> crate::Result>> { + pub(crate) fn tables_or_read(&self, io_stats: IOStatsRef) -> crate::Result>> { let mut guard = self.state.lock().unwrap(); match guard.deref() { TableState::Unloaded(scan_task) => { - let (tables, _) = - materialize_scan_task(scan_task.clone(), Some(self.schema.clone()), io_stats)?; + let (tables, _) = materialize_scan_task( + scan_task.clone(), + Some(self.schema.clone()), + Some(io_stats), + )?; let table_values = Arc::new(tables); // Cache future accesses by setting the state to TableState::Loaded @@ -431,10 +431,7 @@ impl MicroPartition { } } - pub(crate) fn concat_or_get( - &self, - io_stats: Option, - ) -> crate::Result>> { + pub(crate) fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result>> { let tables = self.tables_or_read(io_stats)?; if tables.len() <= 1 { return Ok(tables); diff --git a/src/daft-micropartition/src/ops/agg.rs b/src/daft-micropartition/src/ops/agg.rs index 382da77789..290a49197e 100644 --- a/src/daft-micropartition/src/ops/agg.rs +++ b/src/daft-micropartition/src/ops/agg.rs @@ -9,7 +9,7 @@ impl MicroPartition { pub fn agg(&self, to_agg: &[Expr], group_by: &[Expr]) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::agg")); - let tables = self.concat_or_get(Some(io_stats))?; + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => { diff --git a/src/daft-micropartition/src/ops/concat.rs b/src/daft-micropartition/src/ops/concat.rs index 53a1942a55..1a2112eb90 100644 --- a/src/daft-micropartition/src/ops/concat.rs +++ b/src/daft-micropartition/src/ops/concat.rs @@ -32,7 +32,7 @@ impl MicroPartition { let mut all_tables = vec![]; for m in mps.iter() { - let tables = m.tables_or_read(Some(io_stats.clone()))?; + let tables = m.tables_or_read(io_stats.clone())?; all_tables.extend_from_slice(tables.as_slice()); } let mut all_stats = None; diff --git a/src/daft-micropartition/src/ops/eval_expressions.rs b/src/daft-micropartition/src/ops/eval_expressions.rs index c8dcfce87e..1e81d05f4d 100644 --- a/src/daft-micropartition/src/ops/eval_expressions.rs +++ b/src/daft-micropartition/src/ops/eval_expressions.rs @@ -34,7 +34,7 @@ impl MicroPartition { let io_stats = IOStatsContext::new(format!("MicroPartition::eval_expression_list")); let expected_schema = infer_schema(exprs, &self.schema)?; - let tables = self.tables_or_read(Some(io_stats))?; + let tables = self.tables_or_read(io_stats)?; let evaluated_tables = tables .iter() .map(|t| t.eval_expression_list(exprs)) @@ -56,7 +56,7 @@ impl MicroPartition { pub fn explode(&self, exprs: &[Expr]) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::explode")); - let tables = self.tables_or_read(Some(io_stats))?; + let tables = self.tables_or_read(io_stats)?; let evaluated_tables = tables .iter() .map(|t| t.explode(exprs)) diff --git a/src/daft-micropartition/src/ops/filter.rs b/src/daft-micropartition/src/ops/filter.rs index c94792099a..56f5acf7e7 100644 --- a/src/daft-micropartition/src/ops/filter.rs +++ b/src/daft-micropartition/src/ops/filter.rs @@ -28,7 +28,7 @@ impl MicroPartition { } // TODO figure out defered IOStats let tables = self - .tables_or_read(Some(io_stats))? + .tables_or_read(io_stats)? .iter() .map(|t| t.filter(predicate)) .collect::>>() diff --git a/src/daft-micropartition/src/ops/join.rs b/src/daft-micropartition/src/ops/join.rs index 4eeee30155..49fce45e52 100644 --- a/src/daft-micropartition/src/ops/join.rs +++ b/src/daft-micropartition/src/ops/join.rs @@ -34,10 +34,10 @@ impl MicroPartition { if let TruthValue::False = tv { return Ok(Self::empty(Some(join_schema.into()))); } - let io_stats = IOStatsContext::new(format!("MicroPartition::join:")); + let io_stats = IOStatsContext::new(format!("MicroPartition::join")); - let lt = self.concat_or_get(Some(io_stats.clone()))?; - let rt = right.concat_or_get(Some(io_stats))?; + let lt = self.concat_or_get(io_stats.clone())?; + let rt = right.concat_or_get(io_stats)?; match (lt.as_slice(), rt.as_slice()) { ([], _) | (_, []) => Ok(Self::empty(Some(join_schema.into()))), diff --git a/src/daft-micropartition/src/ops/partition.rs b/src/daft-micropartition/src/ops/partition.rs index 54de355438..9d07343304 100644 --- a/src/daft-micropartition/src/ops/partition.rs +++ b/src/daft-micropartition/src/ops/partition.rs @@ -48,7 +48,7 @@ impl MicroPartition { ) -> DaftResult> { let io_stats = IOStatsContext::new(format!("MicroPartition::partition_by_hash")); - let tables = self.tables_or_read(Some(io_stats))?; + let tables = self.tables_or_read(io_stats)?; if tables.is_empty() { return Ok( @@ -68,7 +68,7 @@ impl MicroPartition { pub fn partition_by_random(&self, num_partitions: usize, seed: u64) -> DaftResult> { let io_stats = IOStatsContext::new(format!("MicroPartition::partition_by_random")); - let tables = self.tables_or_read(Some(io_stats))?; + let tables = self.tables_or_read(io_stats)?; if tables.is_empty() { return Ok( @@ -94,7 +94,7 @@ impl MicroPartition { ) -> DaftResult> { let io_stats = IOStatsContext::new(format!("MicroPartition::partition_by_range")); - let tables = self.tables_or_read(Some(io_stats))?; + let tables = self.tables_or_read(io_stats)?; if tables.is_empty() { let num_partitions = boundaries.len() + 1; diff --git a/src/daft-micropartition/src/ops/slice.rs b/src/daft-micropartition/src/ops/slice.rs index 0ff52f0861..b16d5e7846 100644 --- a/src/daft-micropartition/src/ops/slice.rs +++ b/src/daft-micropartition/src/ops/slice.rs @@ -7,7 +7,7 @@ impl MicroPartition { pub fn slice(&self, start: usize, end: usize) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::slice {start}-{end}")); - let tables = self.tables_or_read(Some(io_stats))?; + let tables = self.tables_or_read(io_stats)?; let mut slices_tables = vec![]; let mut rows_needed = (end - start).max(0); let mut offset_so_far = start; diff --git a/src/daft-micropartition/src/ops/sort.rs b/src/daft-micropartition/src/ops/sort.rs index 3b2081b23b..476e00674c 100644 --- a/src/daft-micropartition/src/ops/sort.rs +++ b/src/daft-micropartition/src/ops/sort.rs @@ -10,9 +10,9 @@ use crate::micropartition::MicroPartition; impl MicroPartition { pub fn sort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult { - let io_stats = IOStatsContext::new(format!("MicroPartition::sort:")); + let io_stats = IOStatsContext::new(format!("MicroPartition::sort")); - let tables = self.concat_or_get(Some(io_stats))?; + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => Ok(Self::empty(Some(self.schema.clone()))), [single] => { @@ -28,9 +28,9 @@ impl MicroPartition { } pub fn argsort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult { - let io_stats = IOStatsContext::new(format!("MicroPartition::argsort:")); + let io_stats = IOStatsContext::new(format!("MicroPartition::argsort")); - let tables = self.concat_or_get(Some(io_stats))?; + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => { let empty_table = Table::empty(Some(self.schema.clone()))?; diff --git a/src/daft-micropartition/src/ops/take.rs b/src/daft-micropartition/src/ops/take.rs index 1a8698f264..ffa190c163 100644 --- a/src/daft-micropartition/src/ops/take.rs +++ b/src/daft-micropartition/src/ops/take.rs @@ -11,7 +11,7 @@ impl MicroPartition { pub fn take(&self, idx: &Series) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::take")); - let tables = self.concat_or_get(Some(io_stats))?; + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { // Fallback onto `[empty_table]` behavior [] => { @@ -38,7 +38,7 @@ impl MicroPartition { pub fn sample(&self, num: usize) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::sample({num})")); - let tables = self.concat_or_get(Some(io_stats))?; + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => Ok(Self::empty(Some(self.schema.clone()))), @@ -57,7 +57,7 @@ impl MicroPartition { pub fn quantiles(&self, num: usize) -> DaftResult { let io_stats = IOStatsContext::new(format!("MicroPartition::quantiles({num})")); - let tables = self.concat_or_get(Some(io_stats))?; + let tables = self.concat_or_get(io_stats)?; match tables.as_slice() { [] => Ok(Self::empty(Some(self.schema.clone()))), [single] => { diff --git a/src/daft-micropartition/src/python.rs b/src/daft-micropartition/src/python.rs index 51eff1f7aa..97a768b66c 100644 --- a/src/daft-micropartition/src/python.rs +++ b/src/daft-micropartition/src/python.rs @@ -43,7 +43,7 @@ impl PyMicroPartition { pub fn get_column(&self, name: &str, py: Python) -> PyResult { let tables = py.allow_threads(|| { let io_stats = IOStatsContext::new(format!("PyMicroPartition::get_column: {name}")); - self.inner.concat_or_get(Some(io_stats)) + self.inner.concat_or_get(io_stats) })?; let columns = tables .iter() @@ -130,7 +130,7 @@ impl PyMicroPartition { pub fn to_table(&self, py: Python) -> PyResult { let concatted = py.allow_threads(|| { let io_stats = IOStatsContext::new("PyMicroPartition::to_table".to_string()); - self.inner.concat_or_get(Some(io_stats)) + self.inner.concat_or_get(io_stats) })?; match &concatted.as_ref()[..] { [] => PyTable::empty(Some(self.schema()?)),