Skip to content

Commit

Permalink
[FEAT] Iceberg MOR for streaming parquet (#2975)
Browse files Browse the repository at this point in the history
This PR implements iceberg MOR for streaming parquet reads.

Notes:
- Since reading iceberg delete files is a bulk operation, the native
executor will use the `read_parquet_bulk` function to read them. However
the native executor is already running in a Tokio runtime, so it needs
to run `read_parquet_bulk` in the IO pool. This PR refactors the
`read_parquet_bulk` method such that it can be ran in this manner.

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Oct 14, 2024
1 parent c694c9e commit fe10f8c
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 94 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ jobs:
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -513,6 +517,7 @@ jobs:
pytest tests/integration/iceberg -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
9 changes: 7 additions & 2 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@ lazy_static! {
pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get();
}

pub(crate) type TaskSet<T> = tokio::task::JoinSet<T>;
pub(crate) fn create_task_set<T>() -> TaskSet<T> {
tokio::task::JoinSet::new()
}

pub struct ExecutionRuntimeHandle {
worker_set: tokio::task::JoinSet<crate::Result<()>>,
worker_set: TaskSet<crate::Result<()>>,
default_morsel_size: usize,
}

impl ExecutionRuntimeHandle {
#[must_use]
pub fn new(default_morsel_size: usize) -> Self {
Self {
worker_set: tokio::task::JoinSet::new(),
worker_set: create_task_set(),
default_morsel_size,
}
}
Expand Down
127 changes: 105 additions & 22 deletions src/daft-local-execution/src/sources/scan_task.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use std::sync::Arc;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};

use common_error::DaftResult;
use common_file_formats::{FileFormatConfig, ParquetSourceConfig};
use daft_core::prelude::{AsArrow, Int64Array, Utf8Array};
use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions};
use daft_io::IOStatsRef;
use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions};
use daft_micropartition::MicroPartition;
use daft_parquet::read::ParquetSchemaInferenceOptions;
use daft_parquet::read::{read_parquet_bulk_async, ParquetSchemaInferenceOptions};
use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask};
use futures::{Stream, StreamExt};
use snafu::ResultExt;
Expand All @@ -16,7 +20,7 @@ use tracing::instrument;
use crate::{
channel::{create_channel, Sender},
sources::source::{Source, SourceStream},
ExecutionRuntimeHandle,
ExecutionRuntimeHandle, JoinSnafu, TaskSet, NUM_CPUS,
};

pub struct ScanTaskSource {
Expand All @@ -38,9 +42,11 @@ impl ScanTaskSource {
sender: Sender<Arc<MicroPartition>>,
maintain_order: bool,
io_stats: IOStatsRef,
delete_map: Option<Arc<HashMap<String, Vec<i64>>>>,
) -> DaftResult<()> {
let schema = scan_task.materialized_schema();
let mut stream = stream_scan_task(scan_task, Some(io_stats), maintain_order).await?;
let mut stream =
stream_scan_task(scan_task, Some(io_stats), delete_map, maintain_order).await?;
let mut has_data = false;
while let Some(partition) = stream.next().await {
let _ = sender.send(partition?).await;
Expand Down Expand Up @@ -77,17 +83,28 @@ impl Source for ScanTaskSource {
vec![receiver],
)
};
for (scan_task, sender) in self.scan_tasks.iter().zip(senders) {
runtime_handle.spawn(
Self::process_scan_task_stream(
scan_task.clone(),
sender,
maintain_order,
io_stats.clone(),
),
self.name(),
);
}
let scan_tasks = self.scan_tasks.clone();
runtime_handle.spawn(
async move {
let mut task_set = TaskSet::new();
let delete_map = get_delete_map(&scan_tasks).await?.map(Arc::new);
for (scan_task, sender) in scan_tasks.into_iter().zip(senders) {
task_set.spawn(Self::process_scan_task_stream(
scan_task,
sender,
maintain_order,
io_stats.clone(),
delete_map.clone(),
));
}
while let Some(result) = task_set.join_next().await {
result.context(JoinSnafu)??;
}
Ok(())
},
self.name(),
);

let stream = futures::stream::iter(receivers.into_iter().map(ReceiverStream::new));
Ok(Box::pin(stream.flatten()))
}
Expand All @@ -97,9 +114,80 @@ impl Source for ScanTaskSource {
}
}

// Read all iceberg delete files and return a map of file paths to delete positions
async fn get_delete_map(
scan_tasks: &[Arc<ScanTask>],
) -> DaftResult<Option<HashMap<String, Vec<i64>>>> {
let delete_files = scan_tasks
.iter()
.flat_map(|st| {
st.sources
.iter()
.filter_map(|source| source.get_iceberg_delete_files())
.flatten()
.cloned()
})
.collect::<HashSet<_>>();
if delete_files.is_empty() {
return Ok(None);
}

let (runtime, io_client) = scan_tasks
.first()
.unwrap() // Safe to unwrap because we checked that the list is not empty
.storage_config
.get_io_client_and_runtime()?;
let scan_tasks = scan_tasks.to_vec();
runtime.block_on_io_pool(async move {
let mut delete_map = scan_tasks
.iter()
.flat_map(|st| st.sources.iter().map(|s| s.get_path().to_string()))
.map(|path| (path, vec![]))
.collect::<std::collections::HashMap<_, _>>();
let columns_to_read = Some(vec!["file_path".to_string(), "pos".to_string()]);
let result = read_parquet_bulk_async(
delete_files.into_iter().collect(),
columns_to_read,
None,
None,
None,
None,
io_client,
None,
*NUM_CPUS,
ParquetSchemaInferenceOptions::new(None),
None,
None,
None,
None,
)
.await?;

for table_result in result {
let table = table_result?;
// values in the file_path column are guaranteed by the iceberg spec to match the full URI of the corresponding data file
// https://iceberg.apache.org/spec/#position-delete-files
let file_paths = table.get_column("file_path")?.downcast::<Utf8Array>()?;
let positions = table.get_column("pos")?.downcast::<Int64Array>()?;

for (file, pos) in file_paths
.as_arrow()
.values_iter()
.zip(positions.as_arrow().values_iter())
{
if delete_map.contains_key(file) {
delete_map.get_mut(file).unwrap().push(*pos);
}
}
}
Ok(Some(delete_map))
})?
}

async fn stream_scan_task(
scan_task: Arc<ScanTask>,
io_stats: Option<IOStatsRef>,
delete_map: Option<Arc<HashMap<String, Vec<i64>>>>,
maintain_order: bool,
) -> DaftResult<impl Stream<Item = DaftResult<Arc<MicroPartition>>> + Send> {
let pushdown_columns = scan_task.pushdowns.columns.as_ref().map(|v| {
Expand Down Expand Up @@ -159,12 +247,7 @@ async fn stream_scan_task(
let inference_options =
ParquetSchemaInferenceOptions::new(Some(*coerce_int96_timestamp_unit));

if source.get_iceberg_delete_files().is_some() {
return Err(common_error::DaftError::TypeError(
"Streaming reads not supported for Iceberg delete files".to_string(),
));
}

let delete_rows = delete_map.as_ref().and_then(|m| m.get(url).cloned());
let row_groups = if let Some(ChunkSpec::Parquet(row_groups)) = source.get_chunk_spec() {
Some(row_groups.clone())
} else {
Expand All @@ -177,7 +260,6 @@ async fn stream_scan_task(
daft_parquet::read::stream_parquet(
url,
file_column_names.as_deref(),
None,
scan_task.pushdowns.limit,
row_groups,
scan_task.pushdowns.filters.clone(),
Expand All @@ -187,6 +269,7 @@ async fn stream_scan_task(
field_id_mapping.clone(),
metadata,
maintain_order,
delete_rows,
)
.await?
}
Expand Down
3 changes: 3 additions & 0 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ impl ParquetFileReader {
predicate: Option<ExprRef>,
original_columns: Option<Vec<String>>,
original_num_rows: Option<usize>,
delete_rows: Option<Vec<i64>>,
) -> DaftResult<BoxStream<'static, DaftResult<Table>>> {
let daft_schema = Arc::new(daft_core::prelude::Schema::try_from(
self.arrow_schema.as_ref(),
Expand Down Expand Up @@ -426,6 +427,7 @@ impl ParquetFileReader {
let ranges = ranges.clone();
let predicate = predicate.clone();
let original_columns = original_columns.clone();
let delete_rows = delete_rows.clone();
let row_range = *row_range;

tokio::task::spawn(async move {
Expand Down Expand Up @@ -515,6 +517,7 @@ impl ParquetFileReader {
predicate,
original_columns,
original_num_rows,
delete_rows,
);
if table_iter.is_none() {
let table =
Expand Down
Loading

0 comments on commit fe10f8c

Please sign in to comment.