Skip to content

Commit

Permalink
[FEAT] Support Reading Iceberg Merge-on-Read Position Deletes (#2563)
Browse files Browse the repository at this point in the history
When position deletes are present, I chose to disable row-group
splitting since it would complicate the logic to implement and it's
unlikely that people are storing a small number of very large individual
files in Iceberg. If there is a clean way to do this I wouldn't be
opposed to implementing it though.

This PR also does a slight refactor of the `DataFileSource` enum to
consolidate `AnonymousDataFile` and `CatalogDataFile` and remove
parameters that aren't relevant to database sources. In a future PR, I
would like to do some refactoring of our `ScanTask` struct in order to
ensure some relationships between the data sources and file formats. For
example a database format would not make sense to exist with a parquet
source, but we implicitly allow it right now. This has already caused
parts of our code to be messy and make unnecessary checks
  • Loading branch information
kevinzwang authored Jul 27, 2024
1 parent 4701290 commit c24635e
Show file tree
Hide file tree
Showing 15 changed files with 397 additions and 177 deletions.
1 change: 1 addition & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ class ScanTask:
storage_config: StorageConfig,
num_rows: int | None,
size_bytes: int | None,
iceberg_delete_files: list[str] | None,
pushdowns: Pushdowns | None,
partition_values: PyTable | None,
stats: PyTable | None,
Expand Down
1 change: 1 addition & 0 deletions daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
num_rows=record_count,
storage_config=self._storage_config,
size_bytes=size_bytes,
iceberg_delete_files=None,
pushdowns=pushdowns,
partition_values=partition_values,
stats=stats._table if stats is not None else None,
Expand Down
1 change: 1 addition & 0 deletions daft/hudi/hudi_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
storage_config=self._storage_config,
num_rows=record_count,
size_bytes=size_bytes,
iceberg_delete_files=None,
pushdowns=pushdowns,
partition_values=partition_values,
stats=stats,
Expand Down
4 changes: 2 additions & 2 deletions daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
# TODO: Support ORC and AVRO when we can read it
raise NotImplementedError(f"{file_format} for iceberg not implemented!")

if len(task.delete_files) > 0:
raise NotImplementedError("Iceberg Merge-on-Read currently not supported, please make an issue!")
iceberg_delete_files = [f.file_path for f in task.delete_files]

# TODO: Thread in Statistics to each ScanTask: P2
pspec = self._iceberg_record_to_partition_spec(self._table.specs()[file.spec_id], file.partition)
Expand All @@ -179,6 +178,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
num_rows=record_count,
storage_config=self._storage_config,
size_bytes=file.file_size_in_bytes,
iceberg_delete_files=iceberg_delete_files,
pushdowns=pushdowns,
partition_values=pspec._table if pspec is not None else None,
stats=None,
Expand Down
130 changes: 121 additions & 9 deletions src/daft-micropartition/src/micropartition.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::{BTreeMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use std::{ops::Deref, sync::Mutex};

use arrow2::io::parquet::read::schema::infer_schema_with_options;
use common_error::DaftResult;
use daft_core::datatypes::Field;
use daft_core::datatypes::{Field, Int64Array, Utf8Array};
use daft_core::schema::{Schema, SchemaRef};

use daft_core::DataType;
Expand All @@ -17,7 +17,7 @@ use daft_parquet::read::{
};
use daft_scan::file_format::{CsvSourceConfig, FileFormatConfig, ParquetSourceConfig};
use daft_scan::storage_config::{NativeStorageConfig, StorageConfig};
use daft_scan::{ChunkSpec, DataFileSource, Pushdowns, ScanTask};
use daft_scan::{ChunkSpec, DataSource, Pushdowns, ScanTask};
use daft_table::Table;

use parquet2::metadata::FileMetaData;
Expand Down Expand Up @@ -134,8 +134,34 @@ fn materialize_scan_task(
}) => {
let inference_options =
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));

// TODO: This is a hardcoded magic value but should be configurable
let num_parallel_tasks = 8;

let urls = urls.collect::<Vec<_>>();

// Create vec of all unique delete files in the scan task
let iceberg_delete_files = scan_task
.sources
.iter()
.flat_map(|s| s.get_iceberg_delete_files())
.flatten()
.map(String::as_str)
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();

let delete_map = _read_delete_files(
iceberg_delete_files.as_slice(),
urls.as_slice(),
io_client.clone(),
io_stats.clone(),
num_parallel_tasks,
runtime_handle.clone(),
&inference_options,
)
.context(DaftCoreComputeSnafu)?;

let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());
let metadatas = scan_task
.sources
Expand All @@ -151,11 +177,12 @@ fn materialize_scan_task(
scan_task.pushdowns.filters.clone(),
io_client.clone(),
io_stats,
8,
num_parallel_tasks,
runtime_handle,
&inference_options,
field_id_mapping.clone(),
metadatas,
Some(delete_map),
)
.context(DaftCoreComputeSnafu)?
}
Expand Down Expand Up @@ -366,7 +393,7 @@ fn materialize_scan_task(
let table_iterators = scan_task.sources.iter().map(|source| {
// Call Python function to create an Iterator (Grabs the GIL and then releases it)
match source {
DataFileSource::PythonFactoryFunction {
DataSource::PythonFactoryFunction {
module,
func_name,
func_args,
Expand Down Expand Up @@ -593,11 +620,19 @@ impl MicroPartition {

let row_groups = parquet_sources_to_row_groups(scan_task.sources.as_slice());

let mut iceberg_delete_files: HashSet<&str> = HashSet::new();
for source in scan_task.sources.iter() {
if let Some(delete_files) = source.get_iceberg_delete_files() {
iceberg_delete_files.extend(delete_files.iter().map(String::as_str));
}
}

read_parquet_into_micropartition(
uris.as_slice(),
columns.as_deref(),
None,
scan_task.pushdowns.limit,
Some(iceberg_delete_files.into_iter().collect()),
row_groups,
scan_task.pushdowns.filters.clone(),
scan_task.partition_spec(),
Expand Down Expand Up @@ -777,7 +812,7 @@ fn prune_fields_from_schema(
}
}

fn parquet_sources_to_row_groups(sources: &[DataFileSource]) -> Option<Vec<Option<Vec<i64>>>> {
fn parquet_sources_to_row_groups(sources: &[DataSource]) -> Option<Vec<Option<Vec<i64>>>> {
let row_groups = sources
.iter()
.map(|s| {
Expand Down Expand Up @@ -918,6 +953,59 @@ fn _get_file_column_names<'a>(
}
}

fn _read_delete_files(
delete_files: &[&str],
uris: &[&str],
io_client: Arc<IOClient>,
io_stats: Option<IOStatsRef>,
num_parallel_tasks: usize,
runtime_handle: Arc<tokio::runtime::Runtime>,
schema_infer_options: &ParquetSchemaInferenceOptions,
) -> DaftResult<HashMap<String, Vec<i64>>> {
let columns: Option<&[&str]> = Some(&["file_path", "pos"]);

let tables = read_parquet_bulk(
delete_files,
columns,
None,
None,
None,
None,
io_client,
io_stats,
num_parallel_tasks,
runtime_handle,
schema_infer_options,
None,
None,
None,
)?;

let mut delete_map: HashMap<String, Vec<i64>> =
uris.iter().map(|uri| (uri.to_string(), vec![])).collect();

for table in tables.iter() {
// values in the file_path column are guaranteed by the iceberg spec to match the full URI of the corresponding data file
// https://iceberg.apache.org/spec/#position-delete-files
let file_paths = table.get_column("file_path")?.downcast::<Utf8Array>()?;
let positions = table.get_column("pos")?.downcast::<Int64Array>()?;

for i in 0..table.len() {
let file = file_paths.get(i);
let pos = positions.get(i);

if let Some(file) = file
&& let Some(pos) = pos
&& delete_map.contains_key(file)
{
delete_map.get_mut(file).unwrap().push(pos);
}
}
}

Ok(delete_map)
}

#[allow(clippy::too_many_arguments)]
fn _read_parquet_into_loaded_micropartition(
io_client: Arc<IOClient>,
Expand All @@ -926,6 +1014,7 @@ fn _read_parquet_into_loaded_micropartition(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
iceberg_delete_files: Option<Vec<&str>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicate: Option<ExprRef>,
partition_spec: Option<&PartitionSpec>,
Expand All @@ -935,6 +1024,20 @@ fn _read_parquet_into_loaded_micropartition(
catalog_provided_schema: Option<SchemaRef>,
field_id_mapping: Option<Arc<BTreeMap<i32, Field>>>,
) -> DaftResult<MicroPartition> {
let delete_map = iceberg_delete_files
.map(|files| {
_read_delete_files(
files.as_slice(),
uris,
io_client.clone(),
io_stats.clone(),
num_parallel_tasks,
runtime_handle.clone(),
schema_infer_options,
)
})
.transpose()?;

let file_column_names = _get_file_column_names(columns, partition_spec);
let all_tables = read_parquet_bulk(
uris,
Expand All @@ -950,6 +1053,7 @@ fn _read_parquet_into_loaded_micropartition(
schema_infer_options,
field_id_mapping,
None,
delete_map,
)?;

// Prefer using the `catalog_provided_schema` but fall back onto inferred schema from Parquet files
Expand Down Expand Up @@ -987,6 +1091,7 @@ pub(crate) fn read_parquet_into_micropartition(
columns: Option<&[&str]>,
start_offset: Option<usize>,
num_rows: Option<usize>,
iceberg_delete_files: Option<Vec<&str>>,
row_groups: Option<Vec<Option<Vec<i64>>>>,
predicate: Option<ExprRef>,
partition_spec: Option<&PartitionSpec>,
Expand All @@ -1011,16 +1116,21 @@ pub(crate) fn read_parquet_into_micropartition(
let runtime_handle = daft_io::get_runtime(multithreaded_io)?;
let io_client = daft_io::get_io_client(multithreaded_io, io_config.clone())?;

// If we have a predicate then we no longer have an accurate accounting of required metadata
// If we have a predicate or deletion files then we no longer have an accurate accounting of required metadata
// on the MicroPartition (e.g. its length). Hence we need to perform an eager read.
if predicate.is_some() {
if iceberg_delete_files
.as_ref()
.map_or(false, |files| !files.is_empty())
|| predicate.is_some()
{
return _read_parquet_into_loaded_micropartition(
io_client,
runtime_handle,
uris,
columns,
start_offset,
num_rows,
iceberg_delete_files,
row_groups,
predicate,
partition_spec,
Expand Down Expand Up @@ -1142,10 +1252,11 @@ pub(crate) fn read_parquet_into_micropartition(
.clone()
.unwrap_or_else(|| std::iter::repeat(None).take(uris.len()).collect()),
)
.map(|((url, metadata), rgs)| DataFileSource::AnonymousDataFile {
.map(|((url, metadata), rgs)| DataSource::File {
path: url,
chunk_spec: rgs.map(ChunkSpec::Parquet),
size_bytes: Some(size_bytes),
iceberg_delete_files: None,
metadata: None,
partition_spec: partition_spec.cloned(),
statistics: None,
Expand Down Expand Up @@ -1194,6 +1305,7 @@ pub(crate) fn read_parquet_into_micropartition(
columns,
start_offset,
num_rows,
iceberg_delete_files,
row_groups,
predicate,
partition_spec,
Expand Down
2 changes: 2 additions & 0 deletions src/daft-micropartition/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ impl PyMicroPartition {
columns.as_deref(),
start_offset,
num_rows,
None,
row_groups.map(|rg| vec![Some(rg)]),
predicate.map(|e| e.expr),
None,
Expand Down Expand Up @@ -637,6 +638,7 @@ impl PyMicroPartition {
columns.as_deref(),
start_offset,
num_rows,
None,
row_groups,
predicate.map(|e| e.expr),
None,
Expand Down
18 changes: 15 additions & 3 deletions src/daft-parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ pub enum Error {
},

#[snafu(display(
"Parquet file: {} metadata listed {} rows but only read: {} ",
"Parquet file: {} expected {} rows but only read: {} ",
path,
metadata_num_rows,
expected_rows,
read_rows
))]
ParquetNumRowMismatch {
path: String,
metadata_num_rows: usize,
expected_rows: usize,
read_rows: usize,
},

Expand All @@ -150,6 +150,18 @@ pub enum Error {
read_columns: usize,
},

#[snafu(display(
"Parquet file: {} attempted to delete row at position {} but only read {} rows",
path,
row,
read_rows
))]
ParquetDeleteRowOutOfIndex {
path: String,
row: usize,
read_rows: usize,
},

#[snafu(display(
"Parquet file: {} unable to convert row group metadata to stats\nDetails:\n{source}",
path,
Expand Down
1 change: 1 addition & 0 deletions src/daft-parquet/src/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub mod pylib {
&schema_infer_options,
None,
None,
None,
)?
.into_iter()
.map(|v| v.into())
Expand Down
Loading

0 comments on commit c24635e

Please sign in to comment.