Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Iceberg API Stubs #1406

Closed
wants to merge 7 commits into from

Conversation

samster25
Copy link
Member

No description provided.

@github-actions github-actions bot added the enhancement New feature or request label Sep 22, 2023
from daft.logical.schema import Field, Schema


class IcebergCatalog:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaychia and @samster25 You might want to familiarize yourselves with the PyIceberg API and how it uses Catalogs. I'd like to discuss using the PyIceberg library for the frontend operations because the logic it's doing for scan-planning and pushdown is quite complicated and very important for performant filtering.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Yes we're currently evaluating a few options for calling into Iceberg functionality/libraries for both reads and writes.

It feels like the Java library is currently the most fully-featured/up to date? We were considering using bindings from Rust into Java to make this work.

Would love to chat about the potential options at our disposal and the pros/cons of each next week.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielcweeks, this API is intended to be user facing and not our underlying internal API. When a user is query from a iceberg table, we intend to insert an IcebergDataSource into our query plan. Then when we actually execute the query, we can push down predicates, partition filters, etc into IcebergDataSource via our optimizer.

Copy link
Contributor

@Fokko Fokko Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey everyone!

This is looking great! Like @danielcweeks already mentioned, if it is a possibility, it would be great to hook up with the PyIceberg API. I think there is a lot of overlap there with the code in this PR.

For example:

  • The creator method that we use is load_catalog, which accepts a type keyword argument that is either glue, dynamodb, rest and sql (which is just backed by a database). Each of the types have their own configuration: https://py.iceberg.apache.org/configuration/
  • The catalog offers all the CRUD operations such as {create,list,drop,load}_{namespace,table}. These all come with PyIceberg for free. The tricky part is that the locking is often done at the catalog level. For example, updating a column in the case of a REST catalog is easy since it is being done by the catalog itself. When using a Hive catalog, you need to obtain a lock and update the Iceberg metadata yourself, which requires quite a bit of code and testing.
cat = load_catalog('default', type='glue')  # will pick up the local AWS configuration

If you then load a table:

tbl = cat.load_table('default.taxis')

Then PyIceberg will load the metadata.json from the object store. This table gives you a rich API to update the table using the newly introduced schema-evolution API. These are all metadata operations and don't really require a backend.

If you then start a query:

tbl.scan(row_filter="trip_distance > 10.0")).to_arrow()

Then PyIceberg will do the actual query planning, and start pruning the irrelevant files based on the partitions and metrics. A short intro on the subject is explained here. The to_arrow() will actually fetch the parquet files as well, and turn it into a PyArrow table.

What I would recommend (but this is up to you of course), is to do the query planning in PyIceberg, and then scale out the actual heavy lifting using Daft. With PyIceberg you have the plan_files method that will return all the relevant Parquet files (one task refers to a single Parquet file, with possible positional/equality delete files) and this can then be read in a distributed fashion.

I think there are also a few caveats that need to be taken care of:

  • Iceberg is lazy, so there can be Parquet files that have different schemas, but they are compatible
  • In the case of positional/equality deletes, you either want to apply the deletes, or make sure you fail to avoid returning incorrect data to the end-user.

What we did for Polars recently, and I think that's a great example:

import polars as pl

# Ability to point directly to a table on s3 
pl.scan_iceberg("s3://bucket/table/metdata.json")

# Accept a PyIceberg table:
cat = load_catalog('glue')
tbl = cat.load_table('default.taxis')

# Will push down the predicate to PyIceberg
pl.scan_iceberg(tbl).filter(pl.col("trip_distance") > 10.0).collect()

Question by @jaychia:

It feels like the Java library is currently the most fully-featured/up to date? We were considering using bindings from Rust into Java to make this work.

The Java library is the reference implementation and contains the latest features.

Here we load the data into an Arrow table and return it to Polars. I don't think that's very applicable for Daft since you probably want to scale out this process. Hope this is some food for thought, happy to help and looking forward to continuing the discussion! 👍

@samster25 samster25 closed this Mar 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants