Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: allow projection of Iceberg fields to pyarrow table schema with names #8144

Closed
wants to merge 1 commit into from

Conversation

moriyoshi
Copy link

@moriyoshi moriyoshi commented Jul 25, 2023

Fixes #7451.

Design note

In this PR, thw following two new keyword arguments are introduced to Table.to_pyarrow, `Table.to_pandas', and likewise.

  • matched_with_field_name (bool)
    • Setting this to True instructs it to map fields between the data file (pyarrow) schema and Iceberg schema by names, when the mapping by field ids is not feasible. Setting this to False willl keep it adhering to the normal behavior.
  • ignore_unprojectable_fields (bool)
    • Setting this to True instructs it to ignore fields that are present in the data file (pyarrow) schema, but absent in the Iceberg schema.

@moriyoshi moriyoshi force-pushed the feature/impl-7451 branch from 3bcd1a3 to e303f1a Compare July 26, 2023 09:34
@moriyoshi moriyoshi marked this pull request as ready for review July 26, 2023 09:34
@moriyoshi
Copy link
Author

Tests ready, mypy checks fail due to the existing oddities.

Copy link
Contributor

@JonasJ-ap JonasJ-ap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your great contribution @moriyoshi. It is really a nice feature to add!

I conducted some tests using an iceberg table migrated from delta lake (no field id in the data file), which is created via the following spark dataframe:

spark
            .range(0, 5, 1, 5)
            .withColumn("longCol", expr("id"))
            .withColumn("decimalCol", expr("CAST(longCol AS DECIMAL(10, 2))"))
            .withColumn("magic_number", expr("rand(5) * 100"))
            .withColumn("dateCol", date_add(current_date(), 1))
            .withColumn("dateString", expr("CAST(dateCol AS STRING)"))
            .withColumn("random1", expr("CAST(rand(5) * 100 as LONG)"))
            .withColumn("random2", expr("CAST(rand(51) * 100 as LONG)"))
            .withColumn("random3", expr("CAST(rand(511) * 100 as LONG)"))
            .withColumn("random4", expr("CAST(rand(15) * 100 as LONG)"))
            .withColumn("random5", expr("CAST(rand(115) * 100 as LONG)"))
            .withColumn("innerStruct1", expr("STRUCT(random1, random2)"))
            .withColumn("innerStruct2", expr("STRUCT(random3, random4)"))
            .withColumn("structCol1", expr("STRUCT(innerStruct1, innerStruct2)"))
            .withColumn(
                "innerStruct3",
                expr("STRUCT(SHA1(CAST(random5 AS BINARY)), SHA1(CAST(random1 AS BINARY)))"))
            .withColumn(
                "structCol2",
                expr(
                    "STRUCT(innerStruct3, STRUCT(SHA1(CAST(random2 AS BINARY)), SHA1(CAST(random3 AS BINARY))))"))
            .withColumn("arrayCol", expr("ARRAY(random1, random2, random3, random4, random5)"))
            .withColumn("arrayStructCol", expr("ARRAY(innerStruct1, innerStruct1, innerStruct1)"))
            .withColumn("mapCol1", expr("MAP(structCol1, structCol2)"))
            .withColumn("mapCol2", expr("MAP(longCol, dateString)"))
            .withColumn("mapCol3", expr("MAP(dateCol, arrayCol)"))
            .withColumn("structCol3", expr("STRUCT(structCol2, mapCol3, arrayCol)"));

I'd like to highlight some areas where we could potentially improve.

  1. If we choose to filter out some nested field, pyarrow_to_schema will fail even with ignore_unprojectable_fields = True
  2. May be we can let pyarrow_to_schema take the complete table schema rather than projected schema. In this way, we can focus on dealing with fields that are missing in the table schema and let
    file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)
    handle the unselected columns
  3. If we have MapType whose key type is nested, pyarrow_to_schema will fail to switch to the correct inner schema when visiting MapType's value.

Please correct me if I misunderstand something.

@@ -360,6 +361,65 @@ def test_schema_to_pyarrow_schema(table_schema_nested: Schema) -> None:
assert repr(actual) == expected


def test_pyarrow_to_schema(table_schema_simple: Schema, table_schema_nested: Schema) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it a good idea to move these two tests to test_pyarrow_visitor.py (containing other tests for pyarrow_to_schema)? We currently have too many tests in test_parrow.py. I think we may want to stop adding new tests to it and consider refactoring it into different files.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, now that test_pyarrow.py contains some backend specific tests, it makes more sense to move those visitor related tests to another file.

) -> Schema:
visitor = _ConvertToIceberg(projected_schema, match_with_field_name, ignore_unprojectable_fields)
ib_schema = visit_pyarrow(schema, visitor)
assert isinstance(ib_schema, StructType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please replace it with a ValueError or similar things? According to comments in other PRs, we try to avoid assert outside tests/

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not intended to warn the user for wrong usage. This is a type guard for mypy and I believe it's valid.

python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
python/pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
@@ -749,7 +869,16 @@ def _task_to_table(
schema_raw = metadata.get(ICEBERG_SCHEMA)
# TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema,
Copy link
Contributor

@JonasJ-ap JonasJ-ap Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this TODO can be removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, considering the purpose of this PR :)

if not self.match_with_field_name:
raise
if projected_field is None and self.match_with_field_name:
projected_field = self.projected_schema.find_field(field.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question] Do we need try...except and case on ingnore_unprojected_fields here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, I have a table with schema:

Schema, id=0
                      ├── 1: name: optional struct<3: firstname: optional string, 4: middlename: optional string, 5: lastname: optional
                      │   string>
                      └── 2: address: optional struct<6: current: optional struct<8: state: optional string, 9: city: optional string>, 7:
                          previous: optional struct<10: state: optional string, 11: city: optional string>>

If I only query the name column:

catalog.load_table(table_name).scan(selected_fields=("name", )).to_pandas(match_with_field_name=True, ignore_unprojectable_fields=True)

I got the following error raised by the find_field here

ValueError: Could not find field with name address, case_sensitive=True

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was exactly what was addressed in the comment, and fixed.

self.next = None
if not isinstance(field.type, (pa.StructType, pa.ListType, pa.MapType)):
return
(self.projected_schema, self.next) = self.projected_schema_stack.pop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to step forward self.next (key -> value, value -> None) when popping from stack, since in the after_field phase, we already visited the field specified by self.next stored on the stack.

This can cause error when we have MapType whose key and value types are nested:

19: mapCol1: optional map<struct<47: innerStruct1: optional struct<49: random1: optional long, 50: random2:
                      │   optional long>, 48: innerStruct2: optional struct<51: random3: optional long, 52: random4: optional long>>,
                      │   struct<53: innerStruct3: optional struct<55: col1: optional string, 56: col2: optional string>, 54: col2: optional
                      │   struct<57: col1: optional string, 58: col2: optional string>>>

In this example, the visitor will use "key"'s projected schema to search for "value"'s name, result in a

ValueError: Could not find field with name innerStruct3, case_sensitive=True

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the stack is used only when we hit any composite type during the traversal, so self.next is valid within such a type. If the part causes the problem described above, then it just doesn't work as intended, or another part is the culprit, I guess. I've been trying to reproduce the problem by preparing the same schema with which I think you used to check the behavior, but had no luck so far.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally I managed to reproduce the problem, and it turned out you are absolutely right! Thanks!

if schema_raw is not None
else pyarrow_to_schema(
physical_schema,
projected_schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want full table schema here. pyarrow_to_schema supposed to simply convert the physical schema to the iceberg schema without handling column pruning. However, projected_schema only contains selected columns in a table scan. If we use it during the conversion, we will have to ignore unselected columns, which I think is unnecessary and tricky to implement. (also inconsistent with the behavior when field_id is present)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't have much to do with the pruning. What we need to acheive with ignore_unprojected_fields here is to simply ignore redundant columns in the actual data, and the purpose of pruning is to take away the fields that are already known according to the catalog. Those are similar, but have different semantics.

@moriyoshi
Copy link
Author

Sorry for leaving this off for a while, I've been quite busy. Let me trry to answer the questions then.

I'd like to highlight some areas where we could potentially improve.

  1. If we choose to filter out some nested field, pyarrow_to_schema will fail even with ignore_unprojectable_fields = True.

This was a bug to be addressed, and I just pushed the fix. Could you take a look?

  1. May be we can let pyarrow_to_schema take the complete table schema rather than projected schema. In this way, we can focus on dealing with fields that are missing in the table schema and let
    file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)

    handle the unselected columns.

Actually what pyarrow_to_schma expects to get by projected_schema is a schema in the catalog (where I thought the word "projected" isn't so good in this context), and if what the "complete table schema" refers to is the catalog schema, its behavior should be pretty much the same as expected here.

  1. If we have MapType whose key type is nested, pyarrow_to_schema will fail to switch to the correct inner schema when visiting MapType's value.

This should've been fixed along with 1.

@moriyoshi moriyoshi force-pushed the feature/impl-7451 branch 3 times, most recently from 9eeacbf to 57af4a5 Compare August 30, 2023 12:00
@moriyoshi
Copy link
Author

@JonasJ-ap Can you have a look at this again?

…th field names when field ids are not available in data files.
@Fokko
Copy link
Contributor

Fokko commented Oct 2, 2023

Hey @moriyoshi Thanks for creating this PR, could you re-create it against the https://github.com/apache/iceberg-python repository? We're migrating the Python to its own repo, and I overlooked this PR.

@Fokko Fokko closed this Oct 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python: Implement Name Mapping to construct iceberg schema when field ids are not present in Data files
3 participants