Skip to content

Commit

Permalink
[PERF] scan task in memory estimate (#1901)
Browse files Browse the repository at this point in the history
* Closes: #1898

1. When column stats are provided, use only the columns in the
materialized schema to estimate in memory size
* when column stats are missing, fall back on schema estimate for that
field
2. When num_rows is provided, use the materialized schema to estimate in
memory size
3. When neither are provided, estimate the in memory size using an
inflation factor (same as our writes) and approximate the number of
rows. Then use the materialized schema to estimate in memory size
4. thread through the new in memory estimator to the ScanWithTask
physical op
  • Loading branch information
samster25 committed Feb 27, 2024
1 parent 24a17dc commit 1b4c11c
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ class ScanTask:
Get number of bytes that will be scanned by this ScanTask.
"""
...
def estimate_in_memory_size_bytes(self, cfg: PyDaftExecutionConfig) -> int:
"""
Estimate the In Memory Size of this ScanTask.
"""
...
@staticmethod
def catalog_scan_task(
file: str,
Expand Down
37 changes: 37 additions & 0 deletions src/daft-core/src/datatypes/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,43 @@ impl DataType {
}
}

pub fn estimate_size_bytes(&self) -> Option<f64> {
const VARIABLE_TYPE_SIZE: f64 = 20.;
const DEFAULT_LIST_LEN: f64 = 4.;

let elem_size = match self.to_physical() {
DataType::Null => Some(0.),
DataType::Boolean => Some(0.125),
DataType::Int8 => Some(1.),
DataType::Int16 => Some(2.),
DataType::Int32 => Some(4.),
DataType::Int64 => Some(8.),
DataType::Int128 => Some(16.),
DataType::UInt8 => Some(1.),
DataType::UInt16 => Some(2.),
DataType::UInt32 => Some(4.),
DataType::UInt64 => Some(8.),
DataType::Float32 => Some(4.),
DataType::Float64 => Some(8.),
DataType::Utf8 => Some(VARIABLE_TYPE_SIZE),
DataType::Binary => Some(VARIABLE_TYPE_SIZE),
DataType::FixedSizeList(dtype, len) => {
dtype.estimate_size_bytes().map(|b| b * (len as f64))
}
DataType::List(dtype) => dtype.estimate_size_bytes().map(|b| b * DEFAULT_LIST_LEN),
DataType::Struct(fields) => Some(
fields
.iter()
.map(|f| f.dtype.estimate_size_bytes().unwrap_or(0f64))
.sum(),
),
DataType::Extension(_, dtype, _) => dtype.estimate_size_bytes(),
_ => None,
};
// add bitmap
elem_size.map(|e| e + 0.125)
}

#[inline]
pub fn is_logical(&self) -> bool {
matches!(
Expand Down
7 changes: 7 additions & 0 deletions src/daft-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ impl Schema {
);
format!("{}\n", table)
}

pub fn estimate_row_size_bytes(&self) -> f64 {
self.fields
.values()
.map(|f| f.dtype.estimate_size_bytes().unwrap_or(0.))
.sum()
}
}

impl Eq for Schema {}
Expand Down
8 changes: 3 additions & 5 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,11 +453,9 @@ impl MicroPartition {
.iter()
.sum();
Some(total_size)
} else if let Some(stats) = &self.statistics {
let row_size = stats.estimate_row_size()?;
Some(row_size * self.len())
} else if let TableState::Unloaded(scan_task) = guard.deref() && let Some(size_bytes_on_disk) = scan_task.size_bytes_on_disk {
Some(size_bytes_on_disk as usize)
} else if let TableState::Unloaded(scan_task) = guard.deref() {
// TODO: pass in the execution config once we have it available
scan_task.estimate_in_memory_size_bytes(None)
} else {
// If the table is not loaded, we don't have stats, and we don't have the file size in bytes, return None.
// TODO(Clark): Should we pull in the table or trigger a file metadata fetch instead of returning None here?
Expand Down
3 changes: 2 additions & 1 deletion src/daft-scan/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[dependencies]
bincode = {workspace = true}
common-daft-config = {path = "../common/daft-config", default-features = false}
common-error = {path = "../common/error", default-features = false}
common-io-config = {path = "../common/io-config", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
Expand All @@ -21,7 +22,7 @@ tokio = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-stats/python", "common-io-config/python"]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-stats/python", "common-io-config/python", "common-daft-config/python"]

[package]
edition = {workspace = true}
Expand Down
48 changes: 45 additions & 3 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![feature(if_let_guard)]
#![feature(let_chains)]
use std::{
borrow::Cow,
fmt::{Debug, Display},
hash::{Hash, Hasher},
sync::Arc,
Expand All @@ -21,6 +22,7 @@ mod anonymous;
pub use anonymous::AnonymousScanOperator;
pub mod file_format;
mod glob;
use common_daft_config::DaftExecutionConfig;
#[cfg(feature = "python")]
pub mod py_object_serde;
pub mod scan_task_iters;
Expand Down Expand Up @@ -385,15 +387,55 @@ impl ScanTask {
}

pub fn size_bytes(&self) -> Option<usize> {
self.size_bytes_on_disk.map(|s| s as usize)
}

pub fn estimate_in_memory_size_bytes(
&self,
config: Option<&DaftExecutionConfig>,
) -> Option<usize> {
let mat_schema = self.materialized_schema();
self.statistics
.as_ref()
.and_then(|s| {
// Derive in-memory size estimate from table stats.
self.num_rows()
.and_then(|num_rows| Some(num_rows * s.estimate_row_size().ok()?))
self.num_rows().and_then(|num_rows| {
let row_size = s.estimate_row_size(Some(mat_schema.as_ref())).ok()?;
let estimate = (num_rows as f64) * row_size;
Some(estimate as usize)
})
})
.or_else(|| {
// if num rows is provided, use that to estimate row size bytes
self.num_rows().map(|num_rows| {
let row_size = mat_schema.estimate_row_size_bytes();
let estimate = (num_rows as f64) * row_size;
estimate as usize
})
})
// Fall back on on-disk size.
.or_else(|| self.size_bytes_on_disk.map(|s| s as usize))
.or_else(|| {
self.size_bytes_on_disk.map(|file_size| {
// use inflation factor from config
let config = config
.map_or_else(|| Cow::Owned(DaftExecutionConfig::default()), Cow::Borrowed);
let inflation_factor = match self.file_format_config.as_ref() {
FileFormatConfig::Parquet(_) => config.parquet_inflation_factor,
FileFormatConfig::Csv(_) | FileFormatConfig::Json(_) => {
config.csv_inflation_factor
}
};

// estimate number of rows from read schema
let in_mem_size: f64 = (file_size as f64) * inflation_factor;
let read_row_size = self.schema.estimate_row_size_bytes();
let approx_rows = in_mem_size / read_row_size;

// estimate in memory size using mat schema estimate
let proj_schema_size = mat_schema.estimate_row_size_bytes();
(approx_rows * proj_schema_size) as usize
})
})
}

pub fn partition_spec(&self) -> Option<&PartitionSpec> {
Expand Down
13 changes: 12 additions & 1 deletion src/daft-scan/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod pylib {
use crate::file_format::PyFileFormatConfig;
use crate::glob::GlobScanOperator;
use crate::storage_config::PyStorageConfig;

use common_daft_config::PyDaftExecutionConfig;
#[pyclass(module = "daft.daft", frozen)]
#[derive(Debug, Clone)]
pub struct ScanOperatorHandle {
Expand Down Expand Up @@ -245,6 +245,17 @@ pub mod pylib {
pub fn size_bytes(&self) -> PyResult<Option<i64>> {
Ok(self.0.size_bytes().map(i64::try_from).transpose()?)
}

pub fn estimate_in_memory_size_bytes(
&self,
cfg: PyDaftExecutionConfig,
) -> PyResult<Option<i64>> {
Ok(self
.0
.estimate_in_memory_size_bytes(Some(cfg.config.as_ref()))
.map(i64::try_from)
.transpose()?)
}
}

#[pymethods]
Expand Down
12 changes: 7 additions & 5 deletions src/daft-stats/src/column_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ impl ColumnRangeStatistics {
}
}

pub(crate) fn element_size(&self) -> crate::Result<usize> {
pub(crate) fn element_size(&self) -> crate::Result<Option<f64>> {
match self {
Self::Missing => Ok(0),
Self::Loaded(l, u) => Ok((l.size_bytes().context(DaftCoreComputeSnafu)?
+ u.size_bytes().context(DaftCoreComputeSnafu)?)
/ 2),
Self::Missing => Ok(None),
Self::Loaded(l, u) => Ok(Some(
((l.size_bytes().context(DaftCoreComputeSnafu)?
+ u.size_bytes().context(DaftCoreComputeSnafu)?) as f64)
/ 2.,
)),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/daft-stats/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(let_chains)]
use common_error::DaftError;
use snafu::Snafu;

Expand Down
30 changes: 25 additions & 5 deletions src/daft-stats/src/table_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,32 @@ impl TableStatistics {
})
}

pub fn estimate_row_size(&self) -> super::Result<usize> {
let mut sum_so_far = 0;

for elem_size in self.columns.values().map(|c| c.element_size()) {
sum_so_far += elem_size?;
pub fn estimate_row_size(&self, schema: Option<&Schema>) -> super::Result<f64> {
let mut sum_so_far = 0.;

if let Some(schema) = schema {
// if schema provided, use it
for field in schema.fields.values() {
let name = field.name.as_str();
let elem_size = if let Some(stats) = self.columns.get(name) {
// first try to use column stats
stats.element_size()?
} else {
None
}
.or_else(|| {
// failover to use dtype estimate
field.dtype.estimate_size_bytes()
})
.unwrap_or(0.);
sum_so_far += elem_size;
}
} else {
for elem_size in self.columns.values().map(|c| c.element_size()) {
sum_so_far += elem_size?.unwrap_or(0.);
}
}

Ok(sum_so_far)
}

Expand Down

0 comments on commit 1b4c11c

Please sign in to comment.