Skip to content

Commit

Permalink
Reduce clone of Statistics in ListingTable and PartitionedFile (a…
Browse files Browse the repository at this point in the history
…pache#11802)

* reduce clone of `Statistics` by using arc.

* optimize `get_statistics_with_limit` and `split_files`.

* directly create the col stats set.

* fix pb.

* fix fmt.

* fix clippy.

* fix compile.

* remove stale codes.

* optimize `split_files` by using drain.

* remove default for PartitionedFile.

* don't keep `Arc<Statistic>` in `PartitionedFile`.

* fix pb.
  • Loading branch information
Rachelint authored Aug 6, 2024
1 parent 16a3557 commit bddb641
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 97 deletions.
21 changes: 17 additions & 4 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Helper functions for the table implementation
use std::collections::HashMap;
use std::mem;
use std::sync::Arc;

use super::PartitionedFile;
Expand Down Expand Up @@ -138,10 +139,22 @@ pub fn split_files(

// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
partitioned_files
.chunks(chunk_size)
.map(|c| c.to_vec())
.collect()
let mut chunks = Vec::with_capacity(n);
let mut current_chunk = Vec::with_capacity(chunk_size);
for file in partitioned_files.drain(..) {
current_chunk.push(file);
if current_chunk.len() == chunk_size {
let full_chunk =
mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
chunks.push(full_chunk);
}
}

if !current_chunk.is_empty() {
chunks.push(current_chunk)
}

chunks
}

struct Partition {
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct PartitionedFile {
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: impl Into<String>, size: u64) -> Self {
Expand Down
26 changes: 14 additions & 12 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,15 +973,16 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
let mut part_file = part_file?;
let part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
Ok((part_file, statistics))
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
as Result<(PartitionedFile, Statistics)>
Ok((
part_file,
Arc::new(Statistics::new_unknown(&self.file_schema)),
))
}
})
.boxed()
Expand Down Expand Up @@ -1011,12 +1012,12 @@ impl ListingTable {
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
) -> Result<Statistics> {
let statistics_cache = self.collected_statistics.clone();
return match statistics_cache
) -> Result<Arc<Statistics>> {
match self
.collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
Some(statistics) => Ok(statistics.as_ref().clone()),
Some(statistics) => Ok(statistics.clone()),
None => {
let statistics = self
.options
Expand All @@ -1028,14 +1029,15 @@ impl ListingTable {
&part_file.object_meta,
)
.await?;
statistics_cache.put_with_extra(
let statistics = Arc::new(statistics);
self.collected_statistics.put_with_extra(
&part_file.object_meta.location,
statistics.clone().into(),
statistics.clone(),
&part_file.object_meta,
);
Ok(statistics)
}
};
}
}
}

Expand Down
156 changes: 75 additions & 81 deletions datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use std::mem;
use std::sync::Arc;

use super::listing::PartitionedFile;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
Expand All @@ -26,16 +29,14 @@ use datafusion_common::stats::Precision;
use datafusion_common::ScalarValue;

use futures::{Stream, StreamExt};
use itertools::izip;
use itertools::multiunzip;

/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
/// `limit` number of rows. `collect_stats` is passed down from the configuration parameter on
/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
/// call to `multiunzip` for constructing file level summary statistics.
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
file_schema: SchemaRef,
limit: Option<usize>,
collect_stats: bool,
Expand All @@ -48,26 +49,27 @@ pub async fn get_statistics_with_limit(
// - zero for summations, and
// - neutral element for extreme points.
let size = file_schema.fields().len();
let mut null_counts: Vec<Precision<usize>> = vec![Precision::Absent; size];
let mut max_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut min_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
let mut col_stats_set = vec![ColumnStatistics::default(); size];
let mut num_rows = Precision::<usize>::Absent;
let mut total_byte_size = Precision::<usize>::Absent;

// Fusing the stream allows us to call next safely even once it is finished.
let mut all_files = Box::pin(all_files.fuse());

if let Some(first_file) = all_files.next().await {
let (file, file_stats) = first_file?;
let (mut file, file_stats) = first_file?;
file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);

// First file, we set them directly from the file statistics.
num_rows = file_stats.num_rows;
total_byte_size = file_stats.total_byte_size;
for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() {
null_counts[index] = file_column.null_count;
max_values[index] = file_column.max_value;
min_values[index] = file_column.min_value;
num_rows = file_stats.num_rows.clone();
total_byte_size = file_stats.total_byte_size.clone();
for (index, file_column) in
file_stats.column_statistics.clone().into_iter().enumerate()
{
col_stats_set[index].null_count = file_column.null_count;
col_stats_set[index].max_value = file_column.max_value;
col_stats_set[index].min_value = file_column.min_value;
}

// If the number of rows exceeds the limit, we can stop processing
Expand All @@ -80,7 +82,8 @@ pub async fn get_statistics_with_limit(
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
let (file, file_stats) = current?;
let (mut file, file_stats) = current?;
file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);
if !collect_stats {
continue;
Expand All @@ -90,38 +93,28 @@ pub async fn get_statistics_with_limit(
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
num_rows = add_row_stats(file_stats.num_rows, num_rows);
num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows);

total_byte_size =
add_row_stats(file_stats.total_byte_size, total_byte_size);
add_row_stats(file_stats.total_byte_size.clone(), total_byte_size);

(null_counts, max_values, min_values) = multiunzip(
izip!(
file_stats.column_statistics.into_iter(),
null_counts.into_iter(),
max_values.into_iter(),
min_values.into_iter()
)
.map(
|(
ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
distinct_count: _,
},
null_count,
max_value,
min_value,
)| {
(
add_row_stats(file_nc, null_count),
set_max_if_greater(file_max, max_value),
set_min_if_lesser(file_min, min_value),
)
},
),
);
for (file_col_stats, col_stats) in file_stats
.column_statistics
.iter()
.zip(col_stats_set.iter_mut())
{
let ColumnStatistics {
null_count: file_nc,
max_value: file_max,
min_value: file_min,
distinct_count: _,
} = file_col_stats;

col_stats.null_count =
add_row_stats(file_nc.clone(), col_stats.null_count.clone());
set_max_if_greater(file_max, &mut col_stats.max_value);
set_min_if_lesser(file_min, &mut col_stats.min_value)
}

// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
Expand All @@ -139,7 +132,7 @@ pub async fn get_statistics_with_limit(
let mut statistics = Statistics {
num_rows,
total_byte_size,
column_statistics: get_col_stats_vec(null_counts, max_values, min_values),
column_statistics: col_stats_set,
};
if all_files.next().await.is_some() {
// If we still have files in the stream, it means that the limit kicked
Expand Down Expand Up @@ -182,21 +175,6 @@ fn add_row_stats(
}
}

pub(crate) fn get_col_stats_vec(
null_counts: Vec<Precision<usize>>,
max_values: Vec<Precision<ScalarValue>>,
min_values: Vec<Precision<ScalarValue>>,
) -> Vec<ColumnStatistics> {
izip!(null_counts, max_values, min_values)
.map(|(null_count, max_value, min_value)| ColumnStatistics {
null_count,
max_value,
min_value,
distinct_count: Precision::Absent,
})
.collect()
}

pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
Expand Down Expand Up @@ -238,45 +216,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
/// If the given value is numerically greater than the original maximum value,
/// return the new maximum value with appropriate exactness information.
fn set_max_if_greater(
max_nominee: Precision<ScalarValue>,
max_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&max_values, &max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee,
max_nominee: &Precision<ScalarValue>,
max_value: &mut Precision<ScalarValue>,
) {
match (&max_value, max_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
*max_value = max_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 < val2 =>
{
max_nominee.to_inexact()
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_max = mem::take(max_value);
*max_value = exact_max.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*max_value = max_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*max_value = max_nominee.clone();
}
(Precision::Exact(_), Precision::Absent) => max_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => max_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => max_values,
_ => {}
}
}

/// If the given value is numerically lesser than the original minimum value,
/// return the new minimum value with appropriate exactness information.
fn set_min_if_lesser(
min_nominee: Precision<ScalarValue>,
min_values: Precision<ScalarValue>,
) -> Precision<ScalarValue> {
match (&min_values, &min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee,
min_nominee: &Precision<ScalarValue>,
min_value: &mut Precision<ScalarValue>,
) {
match (&min_value, min_nominee) {
(Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
*min_value = min_nominee.clone();
}
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 > val2 =>
{
min_nominee.to_inexact()
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Exact(_), Precision::Absent) => {
let exact_min = mem::take(min_value);
*min_value = exact_min.to_inexact();
}
(Precision::Absent, Precision::Exact(_)) => {
*min_value = min_nominee.clone().to_inexact();
}
(Precision::Absent, Precision::Inexact(_)) => {
*min_value = min_nominee.clone();
}
(Precision::Exact(_), Precision::Absent) => min_values.to_inexact(),
(Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(),
(Precision::Absent, Precision::Inexact(_)) => min_nominee,
(Precision::Absent, Precision::Absent) => Precision::Absent,
_ => min_values,
_ => {}
}
}

0 comments on commit bddb641

Please sign in to comment.