Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT][1/2] Support Iceberg renaming of columns #1937

Closed
wants to merge 25 commits into from

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Feb 21, 2024

Summary

Support field_id renaming of Parquet files along the codepath:

  1. IcebergScanOperator
  2. Generates ScanTasks, each containing the field_id_mapping: Arc<{i32: Field}>
  3. Propagated to workers through the ScanWithTask instruction object
  4. Micropartitions are created with MicroPartition::from_scan_task
  5. This then calls into read_parquet_into_micropartition
    a. If statistics are available, it will create an unloaded MicroPartition by creating a new ScanTask (hydrated with statistics) and then calling MicroPartition::new_unloaded(new_scan_task).
    b. Otherwise, it falls back into read_parquet_bulk, which has been modified to correctly handle field_id_mapping

This PR ensures that when data/statistics are read from Parquet files, we correctly apply renaming according to field_id_mapping.

Reviewer Notes

A lot of the errors caught/triggered by this PR has to do with mismatches between the fields (names/metadata) on our schemas and on our Series objects.

Keeping those two in sync is fairly challenging with the way our code is currently structured.

The approach taken to try and fix this is:

  1. Try to use the same logic for field_id renaming across Series and Schemas
  2. When reading data from Parquet -> arrow2 -> Daft Series/Schema, perform a post-processing step to remove any field metadata that was retrieved from the Parquet files.

However I do think that this is a fairly error-prone situation. Not sure what the best approach is though.

Drive-By

Refactors to clean-up MicroPartitions/ScanTasks and schemas:

  1. Refactored MicroPartition::new_unloaded: it no longer accepts a schema argument; instead internally it will just use the ScanTask's .materialized_schema()
  2. Refactored read_parquet_into_micropartition to significantly reduce code deduplication

Remaining todos:

  • Fix logic with column pruning (need to apply column pruning after applying the field ID mappings)
  • Perform correct renaming for statistics parsing from Parquet metadata
  • Perform recursive renaming for Series and for Schema

@github-actions github-actions bot added the enhancement New feature or request label Feb 21, 2024
Copy link

codecov bot commented Feb 21, 2024

Codecov Report

Attention: Patch coverage is 0% with 4 lines in your changes are missing coverage. Please review.

Project coverage is 84.65%. Comparing base (3e0e334) to head (715a9e6).
Report is 1 commits behind head on main.

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1937      +/-   ##
==========================================
- Coverage   84.68%   84.65%   -0.03%     
==========================================
  Files          57       57              
  Lines        6293     6295       +2     
==========================================
  Hits         5329     5329              
- Misses        964      966       +2     
Files Coverage Δ
daft/iceberg/iceberg_scan.py 0.00% <0.00%> (ø)

@jaychia jaychia force-pushed the jay/fd-add-rename-test branch from 598ab84 to 56562e5 Compare February 21, 2024 19:57
src/daft-table/src/lib.rs Outdated Show resolved Hide resolved
Cargo.toml Outdated
@@ -112,10 +112,11 @@ tokio-util = "0.7.8"
url = "2.4.0"

[workspace.dependencies.arrow2]
# branch = "daft-fork"
# TODO: Update this to daft-fork
# branch = "jay/fd-add-rename-test"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change was added here to populate field_ids in the inferred arrow2 Field's metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once this PR is approved, I'll update this to point to a rebased daft-fork again

@jaychia jaychia force-pushed the jay/fd-add-rename-test branch from 615dd6c to 412ffbc Compare February 24, 2024 08:09
@@ -129,3 +129,16 @@ impl Display for Field {
write!(f, "{}#{}", self.name, self.dtype)
}
}

impl PartialEq for Field {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented a custom PartialEq and Hash for Field because we were getting a bunch of issues with Schema::eq in both Rust and Python, now that our arrow2 reader is propagating field_id into the metadata field.

Not sure if this is the best idea though. Reviewers should feel free to comment!

@jaychia jaychia changed the title [FEAT] Support Iceberg renaming of columns [FEAT][1/2] Support Iceberg renaming of columns Feb 27, 2024
@jaychia jaychia force-pushed the jay/fd-add-rename-test branch 2 times, most recently from 71e5471 to 079ef11 Compare March 4, 2024 19:41
@jaychia jaychia force-pushed the jay/fd-add-rename-test branch from 079ef11 to 570764a Compare March 4, 2024 20:02
Adds support for renaming of nested columns (columns renamed under
structs and lists)

**Reviewers to note: this is a follow-on PR to #1937**

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
@@ -693,6 +708,68 @@ pub(crate) fn read_json_into_micropartition(
}
}

#[allow(clippy::too_many_arguments)]
fn _read_parquet_into_loaded_micropartition(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for code deduplication in read_parquet_into_micropartition

self.metadata.clone(),
pruned_statistics.expect("Unloaded MicroPartition should have statistics"),
)),
TableState::Unloaded(scan_task) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An Unloaded MicroPartition's schema is actually just defined by its ScanTask's schema, so we should be replacing the ScanTask's schema directly.

@@ -95,6 +102,214 @@ where
}
}

fn resolve_dtype_recursively(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resolve_*_recursively code is pretty messy. Happy to take on work to convert it into a Visitor pattern if reviewers think its necessary.

@jaychia
Copy link
Contributor Author

jaychia commented Mar 8, 2024

Closing in favor of better approach in #1990

@jaychia jaychia closed this Mar 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants