-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python: allow projection of Iceberg fields to pyarrow table schema with names #8144
Conversation
3bcd1a3
to
e303f1a
Compare
Tests ready, mypy checks fail due to the existing oddities. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your great contribution @moriyoshi. It is really a nice feature to add!
I conducted some tests using an iceberg table migrated from delta lake (no field id in the data file), which is created via the following spark dataframe:
spark
.range(0, 5, 1, 5)
.withColumn("longCol", expr("id"))
.withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
.withColumn("magic_number", expr("rand(5) * 100"))
.withColumn("dateCol", date_add(current_date(), 1))
.withColumn("dateString", expr("CAST(dateCol AS STRING)"))
.withColumn("random1", expr("CAST(rand(5) * 100 as LONG)"))
.withColumn("random2", expr("CAST(rand(51) * 100 as LONG)"))
.withColumn("random3", expr("CAST(rand(511) * 100 as LONG)"))
.withColumn("random4", expr("CAST(rand(15) * 100 as LONG)"))
.withColumn("random5", expr("CAST(rand(115) * 100 as LONG)"))
.withColumn("innerStruct1", expr("STRUCT(random1, random2)"))
.withColumn("innerStruct2", expr("STRUCT(random3, random4)"))
.withColumn("structCol1", expr("STRUCT(innerStruct1, innerStruct2)"))
.withColumn(
"innerStruct3",
expr("STRUCT(SHA1(CAST(random5 AS BINARY)), SHA1(CAST(random1 AS BINARY)))"))
.withColumn(
"structCol2",
expr(
"STRUCT(innerStruct3, STRUCT(SHA1(CAST(random2 AS BINARY)), SHA1(CAST(random3 AS BINARY))))"))
.withColumn("arrayCol", expr("ARRAY(random1, random2, random3, random4, random5)"))
.withColumn("arrayStructCol", expr("ARRAY(innerStruct1, innerStruct1, innerStruct1)"))
.withColumn("mapCol1", expr("MAP(structCol1, structCol2)"))
.withColumn("mapCol2", expr("MAP(longCol, dateString)"))
.withColumn("mapCol3", expr("MAP(dateCol, arrayCol)"))
.withColumn("structCol3", expr("STRUCT(structCol2, mapCol3, arrayCol)"));
I'd like to highlight some areas where we could potentially improve.
- If we choose to filter out some nested field,
pyarrow_to_schema
will fail even withignore_unprojectable_fields = True
- May be we can let
pyarrow_to_schema
take the complete table schema rather than projected schema. In this way, we can focus on dealing with fields that are missing in the table schema and leticeberg/python/pyiceberg/io/pyarrow.py
Line 773 in 9116118
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - If we have MapType whose key type is nested,
pyarrow_to_schema
will fail to switch to the correct inner schema when visiting MapType's value.
Please correct me if I misunderstand something.
@@ -360,6 +361,65 @@ def test_schema_to_pyarrow_schema(table_schema_nested: Schema) -> None: | |||
assert repr(actual) == expected | |||
|
|||
|
|||
def test_pyarrow_to_schema(table_schema_simple: Schema, table_schema_nested: Schema) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it a good idea to move these two tests to test_pyarrow_visitor.py
(containing other tests for pyarrow_to_schema
)? We currently have too many tests in test_parrow.py
. I think we may want to stop adding new tests to it and consider refactoring it into different files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, now that test_pyarrow.py
contains some backend specific tests, it makes more sense to move those visitor related tests to another file.
) -> Schema: | ||
visitor = _ConvertToIceberg(projected_schema, match_with_field_name, ignore_unprojectable_fields) | ||
ib_schema = visit_pyarrow(schema, visitor) | ||
assert isinstance(ib_schema, StructType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please replace it with a ValueError
or similar things? According to comments in other PRs, we try to avoid assert
outside tests/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not intended to warn the user for wrong usage. This is a type guard for mypy and I believe it's valid.
python/pyiceberg/io/pyarrow.py
Outdated
@@ -749,7 +869,16 @@ def _task_to_table( | |||
schema_raw = metadata.get(ICEBERG_SCHEMA) | |||
# TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this TODO can be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, considering the purpose of this PR :)
python/pyiceberg/io/pyarrow.py
Outdated
if not self.match_with_field_name: | ||
raise | ||
if projected_field is None and self.match_with_field_name: | ||
projected_field = self.projected_schema.find_field(field.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question] Do we need try...except and case on ingnore_unprojected_fields
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, I have a table with schema:
Schema, id=0
├── 1: name: optional struct<3: firstname: optional string, 4: middlename: optional string, 5: lastname: optional
│ string>
└── 2: address: optional struct<6: current: optional struct<8: state: optional string, 9: city: optional string>, 7:
previous: optional struct<10: state: optional string, 11: city: optional string>>
If I only query the name
column:
catalog.load_table(table_name).scan(selected_fields=("name", )).to_pandas(match_with_field_name=True, ignore_unprojectable_fields=True)
I got the following error raised by the find_field
here
ValueError: Could not find field with name address, case_sensitive=True
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was exactly what was addressed in the comment, and fixed.
python/pyiceberg/io/pyarrow.py
Outdated
self.next = None | ||
if not isinstance(field.type, (pa.StructType, pa.ListType, pa.MapType)): | ||
return | ||
(self.projected_schema, self.next) = self.projected_schema_stack.pop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want to step forward self.next
(key -> value, value -> None) when popping from stack, since in the after_field
phase, we already visited the field specified by self.next
stored on the stack.
This can cause error when we have MapType whose key and value types are nested:
19: mapCol1: optional map<struct<47: innerStruct1: optional struct<49: random1: optional long, 50: random2:
│ optional long>, 48: innerStruct2: optional struct<51: random3: optional long, 52: random4: optional long>>,
│ struct<53: innerStruct3: optional struct<55: col1: optional string, 56: col2: optional string>, 54: col2: optional
│ struct<57: col1: optional string, 58: col2: optional string>>>
In this example, the visitor will use "key"'s projected schema to search for "value"'s name, result in a
ValueError: Could not find field with name innerStruct3, case_sensitive=True
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the stack is used only when we hit any composite type during the traversal, so self.next
is valid within such a type. If the part causes the problem described above, then it just doesn't work as intended, or another part is the culprit, I guess. I've been trying to reproduce the problem by preparing the same schema with which I think you used to check the behavior, but had no luck so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally I managed to reproduce the problem, and it turned out you are absolutely right! Thanks!
if schema_raw is not None | ||
else pyarrow_to_schema( | ||
physical_schema, | ||
projected_schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want full table schema here. pyarrow_to_schema
supposed to simply convert the physical schema to the iceberg schema without handling column pruning. However, projected_schema
only contains selected columns in a table scan. If we use it during the conversion, we will have to ignore unselected columns, which I think is unnecessary and tricky to implement. (also inconsistent with the behavior when field_id is present)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't have much to do with the pruning. What we need to acheive with ignore_unprojected_fields
here is to simply ignore redundant columns in the actual data, and the purpose of pruning is to take away the fields that are already known according to the catalog. Those are similar, but have different semantics.
e303f1a
to
890df17
Compare
Sorry for leaving this off for a while, I've been quite busy. Let me trry to answer the questions then.
This was a bug to be addressed, and I just pushed the fix. Could you take a look?
Actually what
This should've been fixed along with 1. |
9eeacbf
to
57af4a5
Compare
@JonasJ-ap Can you have a look at this again? |
…th field names when field ids are not available in data files.
57af4a5
to
feffbbb
Compare
Hey @moriyoshi Thanks for creating this PR, could you re-create it against the https://github.com/apache/iceberg-python repository? We're migrating the Python to its own repo, and I overlooked this PR. |
Fixes #7451.
Design note
In this PR, thw following two new keyword arguments are introduced to
Table.to_pyarrow
, `Table.to_pandas', and likewise.matched_with_field_name
(bool)True
instructs it to map fields between the data file (pyarrow) schema and Iceberg schema by names, when the mapping by field ids is not feasible. Setting this toFalse
willl keep it adhering to the normal behavior.ignore_unprojectable_fields
(bool)True
instructs it to ignore fields that are present in the data file (pyarrow) schema, but absent in the Iceberg schema.