-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Iceberg API Stubs #1406
[FEAT] Iceberg API Stubs #1406
Conversation
from daft.logical.schema import Field, Schema | ||
|
||
|
||
class IcebergCatalog: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaychia and @samster25 You might want to familiarize yourselves with the PyIceberg API and how it uses Catalogs. I'd like to discuss using the PyIceberg library for the frontend operations because the logic it's doing for scan-planning and pushdown is quite complicated and very important for performant filtering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Yes we're currently evaluating a few options for calling into Iceberg functionality/libraries for both reads and writes.
It feels like the Java library is currently the most fully-featured/up to date? We were considering using bindings from Rust into Java to make this work.
Would love to chat about the potential options at our disposal and the pros/cons of each next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielcweeks, this API is intended to be user facing and not our underlying internal API. When a user is query from a iceberg table, we intend to insert an IcebergDataSource
into our query plan. Then when we actually execute the query, we can push down predicates, partition filters, etc into IcebergDataSource
via our optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey everyone!
This is looking great! Like @danielcweeks already mentioned, if it is a possibility, it would be great to hook up with the PyIceberg API. I think there is a lot of overlap there with the code in this PR.
For example:
- The creator method that we use is
load_catalog
, which accepts atype
keyword argument that is eitherglue
,dynamodb
,rest
andsql
(which is just backed by a database). Each of the types have their own configuration: https://py.iceberg.apache.org/configuration/ - The catalog offers all the CRUD operations such as
{create,list,drop,load}_{namespace,table}
. These all come with PyIceberg for free. The tricky part is that the locking is often done at the catalog level. For example, updating a column in the case of a REST catalog is easy since it is being done by the catalog itself. When using a Hive catalog, you need to obtain a lock and update the Iceberg metadata yourself, which requires quite a bit of code and testing.
cat = load_catalog('default', type='glue') # will pick up the local AWS configuration
If you then load a table:
tbl = cat.load_table('default.taxis')
Then PyIceberg will load the metadata.json
from the object store. This table gives you a rich API to update the table using the newly introduced schema-evolution API. These are all metadata operations and don't really require a backend.
If you then start a query:
tbl.scan(row_filter="trip_distance > 10.0")).to_arrow()
Then PyIceberg will do the actual query planning, and start pruning the irrelevant files based on the partitions and metrics. A short intro on the subject is explained here. The to_arrow()
will actually fetch the parquet files as well, and turn it into a PyArrow table.
What I would recommend (but this is up to you of course), is to do the query planning in PyIceberg, and then scale out the actual heavy lifting using Daft. With PyIceberg you have the plan_files
method that will return all the relevant Parquet files (one task refers to a single Parquet file, with possible positional/equality delete files) and this can then be read in a distributed fashion.
I think there are also a few caveats that need to be taken care of:
- Iceberg is lazy, so there can be Parquet files that have different schemas, but they are compatible
- In the case of positional/equality deletes, you either want to apply the deletes, or make sure you fail to avoid returning incorrect data to the end-user.
What we did for Polars recently, and I think that's a great example:
import polars as pl
# Ability to point directly to a table on s3
pl.scan_iceberg("s3://bucket/table/metdata.json")
# Accept a PyIceberg table:
cat = load_catalog('glue')
tbl = cat.load_table('default.taxis')
# Will push down the predicate to PyIceberg
pl.scan_iceberg(tbl).filter(pl.col("trip_distance") > 10.0).collect()
Question by @jaychia:
It feels like the Java library is currently the most fully-featured/up to date? We were considering using bindings from Rust into Java to make this work.
The Java library is the reference implementation and contains the latest features.
Here we load the data into an Arrow table and return it to Polars. I don't think that's very applicable for Daft since you probably want to scale out this process. Hope this is some food for thought, happy to help and looking forward to continuing the discussion! 👍
No description provided.