-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT][1/2] Support Iceberg renaming of columns #1937
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1937 +/- ##
==========================================
- Coverage 84.68% 84.65% -0.03%
==========================================
Files 57 57
Lines 6293 6295 +2
==========================================
Hits 5329 5329
- Misses 964 966 +2
|
598ab84
to
56562e5
Compare
Cargo.toml
Outdated
@@ -112,10 +112,11 @@ tokio-util = "0.7.8" | |||
url = "2.4.0" | |||
|
|||
[workspace.dependencies.arrow2] | |||
# branch = "daft-fork" | |||
# TODO: Update this to daft-fork | |||
# branch = "jay/fd-add-rename-test" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change was added here to populate field_ids in the inferred arrow2 Field's metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once this PR is approved, I'll update this to point to a rebased daft-fork
again
615dd6c
to
412ffbc
Compare
src/daft-core/src/datatypes/field.rs
Outdated
@@ -129,3 +129,16 @@ impl Display for Field { | |||
write!(f, "{}#{}", self.name, self.dtype) | |||
} | |||
} | |||
|
|||
impl PartialEq for Field { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented a custom PartialEq
and Hash
for Field
because we were getting a bunch of issues with Schema::eq
in both Rust and Python, now that our arrow2 reader is propagating field_id
into the metadata field.
Not sure if this is the best idea though. Reviewers should feel free to comment!
71e5471
to
079ef11
Compare
In Iceberg the tables are projected using field-IDs. Even if the column is renamed (and Iceberg is lazy, so the table is not rewritten), it should still read the original column.
079ef11
to
570764a
Compare
Adds support for renaming of nested columns (columns renamed under structs and lists) **Reviewers to note: this is a follow-on PR to #1937** --------- Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
@@ -693,6 +708,68 @@ pub(crate) fn read_json_into_micropartition( | |||
} | |||
} | |||
|
|||
#[allow(clippy::too_many_arguments)] | |||
fn _read_parquet_into_loaded_micropartition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for code deduplication in read_parquet_into_micropartition
self.metadata.clone(), | ||
pruned_statistics.expect("Unloaded MicroPartition should have statistics"), | ||
)), | ||
TableState::Unloaded(scan_task) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An Unloaded
MicroPartition's schema is actually just defined by its ScanTask
's schema, so we should be replacing the ScanTask's schema directly.
@@ -95,6 +102,214 @@ where | |||
} | |||
} | |||
|
|||
fn resolve_dtype_recursively( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resolve_*_recursively
code is pretty messy. Happy to take on work to convert it into a Visitor pattern if reviewers think its necessary.
Closing in favor of better approach in #1990 |
Summary
Support field_id renaming of Parquet files along the codepath:
IcebergScanOperator
ScanTasks
, each containing thefield_id_mapping: Arc<{i32: Field}>
ScanWithTask
instruction objectMicroPartition::from_scan_task
read_parquet_into_micropartition
a. If statistics are available, it will create an unloaded MicroPartition by creating a new ScanTask (hydrated with statistics) and then calling
MicroPartition::new_unloaded(new_scan_task)
.b. Otherwise, it falls back into
read_parquet_bulk
, which has been modified to correctly handlefield_id_mapping
This PR ensures that when data/statistics are read from Parquet files, we correctly apply renaming according to
field_id_mapping
.Reviewer Notes
A lot of the errors caught/triggered by this PR has to do with mismatches between the fields (names/metadata) on our schemas and on our Series objects.
Keeping those two in sync is fairly challenging with the way our code is currently structured.
The approach taken to try and fix this is:
Parquet -> arrow2 -> Daft Series/Schema
, perform a post-processing step to remove any field metadata that was retrieved from the Parquet files.However I do think that this is a fairly error-prone situation. Not sure what the best approach is though.
Drive-By
Refactors to clean-up MicroPartitions/ScanTasks and schemas:
MicroPartition::new_unloaded
: it no longer accepts aschema
argument; instead internally it will just use the ScanTask's.materialized_schema()
read_parquet_into_micropartition
to significantly reduce code deduplicationRemaining todos: