Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 committed Oct 9, 2023
1 parent a068110 commit 87dd2bd
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 19 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/daft-micropartition/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@ arrow2 = {workspace = true}
common-error = {path = "../common/error", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-parquet = {path = "../daft-parquet", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
indexmap = {workspace = true, features = ["serde"]}
parquet2 = {workspace = true}
pyo3 = {workspace = true, optional = true}
pyo3-log = {workspace = true}
snafu = {workspace = true}

[features]
default = ["python"]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python"]
python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-dsl/python", "daft-table/python", "daft-io/python", "daft-parquet/python"]

[package]
edition = {workspace = true}
Expand Down
19 changes: 13 additions & 6 deletions src/daft-micropartition/src/column_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ mod logical;

use daft_core::{datatypes::BooleanArray, IntoSeries, Series};
#[derive(Clone)]
pub struct ColumnStatistics {
lower: Series,
upper: Series,
count: usize,
null_count: usize,
num_bytes: usize,
pub(crate) struct ColumnStatistics {
pub lower: Series,
pub upper: Series,
pub count: usize,
pub null_count: usize,
pub num_bytes: usize,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -110,6 +110,13 @@ num_bytes: {}
}
}

impl std::fmt::Debug for ColumnStatistics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self}")
}
}


impl TryFrom<&daft_dsl::LiteralValue> for ColumnStatistics {
type Error = crate::Error;
fn try_from(value: &daft_dsl::LiteralValue) -> Result<Self, Self::Error> {
Expand Down
160 changes: 160 additions & 0 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
use std::sync::Arc;
use std::{ops::Deref, sync::Mutex};

use arrow2::array::PrimitiveArray;
use common_error::DaftResult;
use daft_core::{IntoSeries, Series};
use daft_core::datatypes::{BooleanArray, DataArray, DaftPhysicalType, DaftNumericType, Utf8Array};
use daft_core::schema::{Schema, SchemaRef};
use daft_dsl::Expr;
use daft_parquet::read::read_parquet_metadata;
use daft_table::Table;
use indexmap::IndexMap;
use parquet2::statistics::{BooleanStatistics, PrimitiveStatistics, Statistics, BinaryStatistics};
use snafu::ResultExt;

use crate::DaftCoreComputeSnafu;
use crate::column_stats::ColumnStatistics;
use crate::{column_stats::TruthValue, table_stats::TableStatistics};
use daft_io::IOConfig;


struct DeferredLoadingParams {
filters: Vec<Expr>,
Expand Down Expand Up @@ -66,6 +76,7 @@ impl MicroPartition {
));
}
}

let guard = self.state.lock().unwrap();
let new_state = match guard.deref() {
TableState::Unloaded(params) => {
Expand All @@ -83,10 +94,159 @@ impl MicroPartition {
.context(DaftCoreComputeSnafu)?,
),
};

// TODO: We should also "filter" the TableStatistics so it's more accurate for downstream tasks
Ok(Self::new(
self.schema.clone(),
new_state,
self.statistics.clone(),
))
}
}



impl From<(&BooleanStatistics)> for ColumnStatistics {
fn from(value: &BooleanStatistics) -> Self {
let lower = value.min_value.unwrap();
let upper = value.max_value.unwrap();
let null_count = value.null_count.unwrap();
// TODO: FIX THESE STATS

ColumnStatistics {
lower: BooleanArray::from(("lower", [lower].as_slice())).into_series(),
upper: BooleanArray::from(("upper", [upper].as_slice())).into_series(),
count: 1,
null_count: null_count as usize,
num_bytes: 1
}

}
}

impl<T: parquet2::types::NativeType + daft_core::datatypes::NumericNative> From<(&PrimitiveStatistics<T>)> for ColumnStatistics{
fn from(value: &PrimitiveStatistics<T>) -> Self {
let lower = value.min_value.unwrap();
let upper = value.max_value.unwrap();
let null_count = value.null_count.unwrap();
// TODO: FIX THESE STATS
let lower = Series::try_from(("lower", Box::new(PrimitiveArray::<T>::from_vec(vec![lower])) as Box<dyn arrow2::array::Array>)).unwrap();
let upper = Series::try_from(("upper", Box::new(PrimitiveArray::<T>::from_vec(vec![upper])) as Box<dyn arrow2::array::Array>)).unwrap();

ColumnStatistics {
lower,
upper,
count: 1,
null_count: null_count as usize,
num_bytes: 1
}

}
}


impl From<(&BinaryStatistics)> for ColumnStatistics {
fn from(value: &BinaryStatistics) -> Self {
let lower = value.min_value.as_ref().unwrap();
let upper = value.max_value.as_ref().unwrap();
let null_count = value.null_count.unwrap();
// TODO: FIX THESE STATS

// for now assuming they are all strings
let lower = String::from_utf8(lower.clone()).unwrap();
let upper = String::from_utf8(upper.clone()).unwrap();

let lower = Utf8Array::from(("lower", [lower.as_str()].as_slice())).into_series();
let upper = Utf8Array::from(("upper", [upper.as_str()].as_slice())).into_series();

ColumnStatistics {
lower,
upper,
count: 1,
null_count: null_count as usize,
num_bytes: 1
}

}
}

impl From<&dyn Statistics> for ColumnStatistics {
fn from(value: &dyn Statistics) -> Self {
let ptype = value.physical_type();
let stats = value.as_any();
use parquet2::schema::types::PhysicalType;
match ptype {
PhysicalType::Boolean => stats.downcast_ref::<BooleanStatistics>().unwrap().into(),
PhysicalType::Int32 => stats.downcast_ref::<PrimitiveStatistics<i32>>().unwrap().into(),
PhysicalType::Int64 => stats.downcast_ref::<PrimitiveStatistics<i64>>().unwrap().into(),
PhysicalType::Int96 => todo!(),
PhysicalType::Float => stats.downcast_ref::<PrimitiveStatistics<f32>>().unwrap().into(),
PhysicalType::Double => stats.downcast_ref::<PrimitiveStatistics<f64>>().unwrap().into(),
PhysicalType::ByteArray => stats.downcast_ref::<BinaryStatistics>().unwrap().into(),
PhysicalType::FixedLenByteArray(size) => {
todo!()
}
}
}
}



impl From<&daft_parquet::metadata::RowGroupMetaData> for TableStatistics {
fn from(value: &daft_parquet::metadata::RowGroupMetaData) -> Self {
let num_rows = value.num_rows();
let mut columns = IndexMap::new();
for col in value.columns() {
let stats = col.statistics().unwrap().unwrap();
let col_stats: ColumnStatistics = stats.as_ref().into();
columns.insert(col.descriptor().path_in_schema.get(0).unwrap().clone(), col_stats);
}

TableStatistics {
columns
}
}
}


fn read_parquet(uri: &str, io_config: Arc<IOConfig>) -> DaftResult<()> {
let runtime_handle = daft_io::get_runtime(true)?;
let io_client = daft_io::get_io_client(true, io_config)?;
let metadata = runtime_handle.block_on(async move {
read_parquet_metadata(uri, io_client).await
})?;

for rg in &metadata.row_groups {
let table_stats: TableStatistics = rg.into();
println!("{table_stats:?}");
}
Ok(())
}


#[cfg(test)]
mod test {
use common_error::DaftResult;
use daft_core::{
array::ops::DaftCompare,
datatypes::{Int32Array, Int64Array},
IntoSeries, Series,
};
use daft_dsl::{col, lit};
use daft_io::IOConfig;
use daft_table::Table;

use crate::column_stats::TruthValue;

use super::{ColumnStatistics, TableStatistics};

#[test]
fn test_pq() -> crate::Result<()> {

let url = "/Users/sammy/daft_200MB_lineitem_chunk.RG-2.parquet";
super::read_parquet(&url, IOConfig::default().into());


Ok(())
}
}
8 changes: 4 additions & 4 deletions src/daft-micropartition/src/table_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use crate::column_stats::ColumnStatistics;

use daft_core::array::ops::{DaftCompare, DaftLogical};

#[derive(Clone)]
pub struct TableStatistics {
columns: IndexMap<String, ColumnStatistics>,
#[derive(Clone, Debug)]
pub(crate) struct TableStatistics {
pub columns: IndexMap<String, ColumnStatistics>,
}
impl TableStatistics {
fn from_table(table: &Table) -> Self {
Expand All @@ -25,7 +25,7 @@ impl TableStatistics {
}

impl TableStatistics {
pub fn eval_expression(&self, expr: &Expr) -> crate::Result<ColumnStatistics> {
pub(crate) fn eval_expression(&self, expr: &Expr) -> crate::Result<ColumnStatistics> {
match expr {
Expr::Alias(col, _) => self.eval_expression(col.as_ref()),
Expr::Column(col) => Ok(self.columns.get(col.as_ref()).unwrap().clone()),
Expand Down
4 changes: 2 additions & 2 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
use arrow2::io::parquet::read::column_iter_to_arrays;

pub(crate) struct ParquetReaderBuilder {
uri: String,
metadata: parquet2::metadata::FileMetaData,
pub uri: String,
pub metadata: parquet2::metadata::FileMetaData,
selected_columns: Option<HashSet<String>>,
row_start_offset: usize,
num_rows: usize,
Expand Down
5 changes: 3 additions & 2 deletions src/daft-parquet/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::sync::Arc;

use daft_io::IOClient;

use parquet2::{metadata::FileMetaData, read::deserialize_metadata};
use parquet2::read::deserialize_metadata;
pub use parquet2::metadata::{FileMetaData, RowGroupMetaData};
use snafu::ResultExt;

use crate::{Error, JoinSnafu, UnableToParseMetadataSnafu};
Expand All @@ -11,7 +12,7 @@ fn metadata_len(buffer: &[u8], len: usize) -> i32 {
i32::from_le_bytes(buffer[len - 8..len - 4].try_into().unwrap())
}

pub async fn read_parquet_metadata(
pub(crate) async fn read_parquet_metadata(
uri: &str,
size: usize,
io_client: Arc<IOClient>,
Expand Down
14 changes: 10 additions & 4 deletions src/daft-parquet/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,12 @@ pub fn read_parquet_schema(
Schema::try_from(builder.build()?.arrow_schema().as_ref())
}

pub async fn read_parquet_metadata(uri: &str, io_client: Arc<IOClient>) -> DaftResult<parquet2::metadata::FileMetaData> {
let builder = ParquetReaderBuilder::from_uri(uri, io_client).await?;
Ok(builder.metadata)
}


pub fn read_parquet_statistics(uris: &Series, io_client: Arc<IOClient>) -> DaftResult<Table> {
let runtime_handle = get_runtime(true)?;
let _rt_guard = runtime_handle.enter();
Expand All @@ -520,10 +526,10 @@ pub fn read_parquet_statistics(uris: &Series, io_client: Arc<IOClient>) -> DaftR
let owned_client = io_client.clone();
tokio::spawn(async move {
if let Some(owned_string) = owned_string {
let builder = ParquetReaderBuilder::from_uri(&owned_string, owned_client).await?;
let num_rows = builder.metadata().num_rows;
let num_row_groups = builder.metadata().row_groups.len();
let version_num = builder.metadata().version;
let metadata = read_parquet_metadata(&owned_string, owned_client).await?;
let num_rows = metadata.num_rows;
let num_row_groups = metadata.row_groups.len();
let version_num = metadata.version;

Ok((Some(num_rows), Some(num_row_groups), Some(version_num)))
} else {
Expand Down

0 comments on commit 87dd2bd

Please sign in to comment.