Skip to content

Commit

Permalink
require io stats
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Nov 13, 2023
1 parent 202e5a9 commit e35267b
Show file tree
Hide file tree
Showing 11 changed files with 28 additions and 31 deletions.
17 changes: 7 additions & 10 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,15 +411,15 @@ impl MicroPartition {
Ok(size_bytes)
}

pub(crate) fn tables_or_read(
&self,
io_stats: Option<IOStatsRef>,
) -> crate::Result<Arc<Vec<Table>>> {
pub(crate) fn tables_or_read(&self, io_stats: IOStatsRef) -> crate::Result<Arc<Vec<Table>>> {
let mut guard = self.state.lock().unwrap();
match guard.deref() {
TableState::Unloaded(scan_task) => {
let (tables, _) =
materialize_scan_task(scan_task.clone(), Some(self.schema.clone()), io_stats)?;
let (tables, _) = materialize_scan_task(
scan_task.clone(),
Some(self.schema.clone()),
Some(io_stats),
)?;
let table_values = Arc::new(tables);

// Cache future accesses by setting the state to TableState::Loaded
Expand All @@ -431,10 +431,7 @@ impl MicroPartition {
}
}

pub(crate) fn concat_or_get(
&self,
io_stats: Option<IOStatsRef>,
) -> crate::Result<Arc<Vec<Table>>> {
pub(crate) fn concat_or_get(&self, io_stats: IOStatsRef) -> crate::Result<Arc<Vec<Table>>> {
let tables = self.tables_or_read(io_stats)?;
if tables.len() <= 1 {
return Ok(tables);
Expand Down
2 changes: 1 addition & 1 deletion src/daft-micropartition/src/ops/agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ impl MicroPartition {
pub fn agg(&self, to_agg: &[Expr], group_by: &[Expr]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::agg"));

let tables = self.concat_or_get(Some(io_stats))?;
let tables = self.concat_or_get(io_stats)?;

match tables.as_slice() {
[] => {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-micropartition/src/ops/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl MicroPartition {
let mut all_tables = vec![];

for m in mps.iter() {
let tables = m.tables_or_read(Some(io_stats.clone()))?;
let tables = m.tables_or_read(io_stats.clone())?;
all_tables.extend_from_slice(tables.as_slice());
}
let mut all_stats = None;
Expand Down
4 changes: 2 additions & 2 deletions src/daft-micropartition/src/ops/eval_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl MicroPartition {
let io_stats = IOStatsContext::new(format!("MicroPartition::eval_expression_list"));

let expected_schema = infer_schema(exprs, &self.schema)?;
let tables = self.tables_or_read(Some(io_stats))?;
let tables = self.tables_or_read(io_stats)?;
let evaluated_tables = tables
.iter()
.map(|t| t.eval_expression_list(exprs))
Expand All @@ -56,7 +56,7 @@ impl MicroPartition {
pub fn explode(&self, exprs: &[Expr]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::explode"));

let tables = self.tables_or_read(Some(io_stats))?;
let tables = self.tables_or_read(io_stats)?;
let evaluated_tables = tables
.iter()
.map(|t| t.explode(exprs))
Expand Down
2 changes: 1 addition & 1 deletion src/daft-micropartition/src/ops/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl MicroPartition {
}
// TODO figure out defered IOStats
let tables = self
.tables_or_read(Some(io_stats))?
.tables_or_read(io_stats)?
.iter()
.map(|t| t.filter(predicate))
.collect::<DaftResult<Vec<_>>>()
Expand Down
6 changes: 3 additions & 3 deletions src/daft-micropartition/src/ops/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ impl MicroPartition {
if let TruthValue::False = tv {
return Ok(Self::empty(Some(join_schema.into())));
}
let io_stats = IOStatsContext::new(format!("MicroPartition::join:"));
let io_stats = IOStatsContext::new(format!("MicroPartition::join"));

let lt = self.concat_or_get(Some(io_stats.clone()))?;
let rt = right.concat_or_get(Some(io_stats))?;
let lt = self.concat_or_get(io_stats.clone())?;
let rt = right.concat_or_get(io_stats)?;

match (lt.as_slice(), rt.as_slice()) {
([], _) | (_, []) => Ok(Self::empty(Some(join_schema.into()))),
Expand Down
6 changes: 3 additions & 3 deletions src/daft-micropartition/src/ops/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl MicroPartition {
) -> DaftResult<Vec<Self>> {
let io_stats = IOStatsContext::new(format!("MicroPartition::partition_by_hash"));

let tables = self.tables_or_read(Some(io_stats))?;
let tables = self.tables_or_read(io_stats)?;

if tables.is_empty() {
return Ok(
Expand All @@ -68,7 +68,7 @@ impl MicroPartition {
pub fn partition_by_random(&self, num_partitions: usize, seed: u64) -> DaftResult<Vec<Self>> {
let io_stats = IOStatsContext::new(format!("MicroPartition::partition_by_random"));

let tables = self.tables_or_read(Some(io_stats))?;
let tables = self.tables_or_read(io_stats)?;

if tables.is_empty() {
return Ok(
Expand All @@ -94,7 +94,7 @@ impl MicroPartition {
) -> DaftResult<Vec<Self>> {
let io_stats = IOStatsContext::new(format!("MicroPartition::partition_by_range"));

let tables = self.tables_or_read(Some(io_stats))?;
let tables = self.tables_or_read(io_stats)?;

if tables.is_empty() {
let num_partitions = boundaries.len() + 1;
Expand Down
2 changes: 1 addition & 1 deletion src/daft-micropartition/src/ops/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ impl MicroPartition {
pub fn slice(&self, start: usize, end: usize) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::slice {start}-{end}"));

let tables = self.tables_or_read(Some(io_stats))?;
let tables = self.tables_or_read(io_stats)?;
let mut slices_tables = vec![];
let mut rows_needed = (end - start).max(0);
let mut offset_so_far = start;
Expand Down
8 changes: 4 additions & 4 deletions src/daft-micropartition/src/ops/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::micropartition::MicroPartition;

impl MicroPartition {
pub fn sort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::sort:"));
let io_stats = IOStatsContext::new(format!("MicroPartition::sort"));

let tables = self.concat_or_get(Some(io_stats))?;
let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
[] => Ok(Self::empty(Some(self.schema.clone()))),
[single] => {
Expand All @@ -28,9 +28,9 @@ impl MicroPartition {
}

pub fn argsort(&self, sort_keys: &[Expr], descending: &[bool]) -> DaftResult<Series> {
let io_stats = IOStatsContext::new(format!("MicroPartition::argsort:"));
let io_stats = IOStatsContext::new(format!("MicroPartition::argsort"));

let tables = self.concat_or_get(Some(io_stats))?;
let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
[] => {
let empty_table = Table::empty(Some(self.schema.clone()))?;
Expand Down
6 changes: 3 additions & 3 deletions src/daft-micropartition/src/ops/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl MicroPartition {
pub fn take(&self, idx: &Series) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::take"));

let tables = self.concat_or_get(Some(io_stats))?;
let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
// Fallback onto `[empty_table]` behavior
[] => {
Expand All @@ -38,7 +38,7 @@ impl MicroPartition {
pub fn sample(&self, num: usize) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::sample({num})"));

let tables = self.concat_or_get(Some(io_stats))?;
let tables = self.concat_or_get(io_stats)?;

match tables.as_slice() {
[] => Ok(Self::empty(Some(self.schema.clone()))),
Expand All @@ -57,7 +57,7 @@ impl MicroPartition {
pub fn quantiles(&self, num: usize) -> DaftResult<Self> {
let io_stats = IOStatsContext::new(format!("MicroPartition::quantiles({num})"));

let tables = self.concat_or_get(Some(io_stats))?;
let tables = self.concat_or_get(io_stats)?;
match tables.as_slice() {
[] => Ok(Self::empty(Some(self.schema.clone()))),
[single] => {
Expand Down
4 changes: 2 additions & 2 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl PyMicroPartition {
pub fn get_column(&self, name: &str, py: Python) -> PyResult<PySeries> {
let tables = py.allow_threads(|| {
let io_stats = IOStatsContext::new(format!("PyMicroPartition::get_column: {name}"));
self.inner.concat_or_get(Some(io_stats))
self.inner.concat_or_get(io_stats)
})?;
let columns = tables
.iter()
Expand Down Expand Up @@ -130,7 +130,7 @@ impl PyMicroPartition {
pub fn to_table(&self, py: Python) -> PyResult<PyTable> {
let concatted = py.allow_threads(|| {
let io_stats = IOStatsContext::new("PyMicroPartition::to_table".to_string());
self.inner.concat_or_get(Some(io_stats))
self.inner.concat_or_get(io_stats)
})?;
match &concatted.as_ref()[..] {
[] => PyTable::empty(Some(self.schema()?)),
Expand Down

0 comments on commit e35267b

Please sign in to comment.