-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Iceberg API Stubs #1406
Closed
Closed
[FEAT] Iceberg API Stubs #1406
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
c96251d
initial stubs
samster25 1d0c126
add methods for updates
samster25 b892f4a
add comments for namespace
samster25 53f3f36
:s/overwrite/save
samster25 c2ccd43
append example
samster25 6654bdd
Add comments
0c85878
simplify examples
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
# mypy: ignore-errors | ||
|
||
from __future__ import annotations | ||
|
||
from contextlib import AbstractContextManager | ||
|
||
import daft | ||
from daft.daft import PartitionScheme, PartitionSpec | ||
from daft.dataframe import DataFrame | ||
from daft.logical.schema import Field, Schema | ||
|
||
|
||
class IcebergCatalog: | ||
# creator functions | ||
@classmethod | ||
def from_glue(cls, args) -> IcebergCatalog: | ||
... | ||
|
||
def from_storage(cls, args) -> IcebergCatalog: | ||
... | ||
|
||
# namespace functions | ||
def list_namespaces(self) -> DataFrame: | ||
... | ||
|
||
def create_namespace(self, namespace: str) -> bool: | ||
... | ||
|
||
def drop_namespace(self, namespace: str) -> bool: | ||
... | ||
|
||
# table functions | ||
def list_tables(self) -> DataFrame: | ||
... | ||
|
||
def create_table( | ||
self, | ||
identifier: str, | ||
schema: Schema, | ||
partition_spec: PartitionSpec | None = None, | ||
sort_columns: list[str] | None = None, | ||
) -> Table: | ||
... | ||
|
||
def load_table(self, identifier: str) -> Table: | ||
... | ||
|
||
|
||
class Table: | ||
def history(self) -> DataFrame: | ||
... | ||
|
||
def schema(self) -> Schema: | ||
... | ||
|
||
def schemas(self) -> dict[int, Schema]: | ||
... | ||
|
||
def partition_spec(self) -> PartitionSpec: | ||
... | ||
|
||
def partition_specs(self) -> dict[int, PartitionSpec]: | ||
... | ||
|
||
def update_schema(self) -> SchemaUpdate: | ||
... | ||
|
||
def update_partition_spec(self) -> PartitionSpecUpdate: | ||
... | ||
|
||
|
||
class SchemaUpdate(AbstractContextManager): | ||
def add_column(self, field: Field) -> SchemaUpdate: | ||
... | ||
|
||
def drop_column(self, name: str) -> SchemaUpdate: | ||
... | ||
|
||
def rename_column(self, name: str, new_name: str) -> SchemaUpdate: | ||
... | ||
|
||
|
||
class PartitionSpecUpdate(AbstractContextManager): | ||
def add_partitioning_field(self, name: str, scheme: PartitionScheme) -> PartitionSpecUpdate: | ||
... | ||
|
||
|
||
class DataframeIcebergNamespace: | ||
def __init__(self, df: DataFrame) -> None: | ||
self.df = df | ||
|
||
@classmethod | ||
def read(cls, catalog: IcebergCatalog, table_identifier: str, snapshot_id: int | None = None) -> DataFrame: | ||
"""Produces a lazy daft DataFrame that is backed by an Iceberg table. | ||
|
||
Args: | ||
catalog (IcebergCatalog): Iceberg catalog to read from | ||
table_identifier (str): table name to read from | ||
snapshot_id (Optional[int], optional): snapshot id of table to read from. Defaults to None, which is the latest snapshot. | ||
|
||
Returns: | ||
DataFrame: a lazy daft dataframe that is backed by the input iceberg table. | ||
""" | ||
... | ||
|
||
def append(self, catalog: IcebergCatalog, table_identifier: str) -> None: | ||
"""Appends records from a daft DataFrame into an Iceberg table following it's Partitioning Spec. | ||
This operation will not affect any of the existing records in the Iceberg Table. | ||
|
||
Args: | ||
catalog (IcebergCatalog): Iceberg catalog to write to | ||
table_identifier (str): table name to write to | ||
""" | ||
... | ||
|
||
def save(self, catalog: IcebergCatalog, table_identifier: str) -> None: | ||
"""saves the records in this Iceberg Table from a daft DataFrame. | ||
This operation follows the Iceberg Table's Schema, Partitioning Scheme and properties when writing the new records. | ||
|
||
Args: | ||
catalog (IcebergCatalog): Iceberg catalog to write to | ||
table_identifier (str): table name to write to | ||
""" | ||
... | ||
|
||
|
||
################# | ||
# EXAMPLE USAGE # | ||
################# | ||
|
||
|
||
def example_deleting_rows() -> None: | ||
"""Delete rows from an Iceberg table""" | ||
# 1. Read a dataframe from a iceberg table on AWS Glue | ||
catalog = IcebergCatalog.from_glue("path/to/glue") | ||
df: DataFrame = daft.read_iceberg(catalog, "my-glue-database.my-table") | ||
|
||
# 2. Run filters on the dataframe itself | ||
df = df.where(df["id"] > 10 & df["id"] < 20) | ||
|
||
# 3. Save the dataframe back to your table | ||
df.iceberg.save(catalog, "my-glue-database.my-table") | ||
|
||
|
||
def example_updating_rows() -> None: | ||
"""Update an Iceberg table""" | ||
# 1. Read a dataframe from a iceberg table on AWS Glue | ||
catalog = IcebergCatalog.from_glue("path/to/glue") | ||
df: DataFrame = daft.read_iceberg(catalog, "my-glue-database.my-table") | ||
|
||
# 2. Run updates on the dataframe itself | ||
df = df.with_column("x", (df["x"] < 10).if_else(0, df["x"])) | ||
|
||
# 3. Save the dataframe back to your table | ||
df.iceberg.save(catalog, "my-glue-database.my-table") | ||
|
||
|
||
def example_appending_rows() -> None: | ||
"""Appending data to an Iceberg table""" | ||
# 1. Load new data in a dataframe | ||
new_data_df = daft.read_parquet("s3://my/new/data/**.pq") | ||
|
||
# 2. Append new data to the Iceberg Table | ||
catalog = IcebergCatalog.from_glue("path/to/glue") | ||
new_data_df.iceberg.append(catalog, "my-glue-database.my-table") |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaychia and @samster25 You might want to familiarize yourselves with the PyIceberg API and how it uses Catalogs. I'd like to discuss using the PyIceberg library for the frontend operations because the logic it's doing for scan-planning and pushdown is quite complicated and very important for performant filtering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Yes we're currently evaluating a few options for calling into Iceberg functionality/libraries for both reads and writes.
It feels like the Java library is currently the most fully-featured/up to date? We were considering using bindings from Rust into Java to make this work.
Would love to chat about the potential options at our disposal and the pros/cons of each next week.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielcweeks, this API is intended to be user facing and not our underlying internal API. When a user is query from a iceberg table, we intend to insert an
IcebergDataSource
into our query plan. Then when we actually execute the query, we can push down predicates, partition filters, etc intoIcebergDataSource
via our optimizer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey everyone!
This is looking great! Like @danielcweeks already mentioned, if it is a possibility, it would be great to hook up with the PyIceberg API. I think there is a lot of overlap there with the code in this PR.
For example:
load_catalog
, which accepts atype
keyword argument that is eitherglue
,dynamodb
,rest
andsql
(which is just backed by a database). Each of the types have their own configuration: https://py.iceberg.apache.org/configuration/{create,list,drop,load}_{namespace,table}
. These all come with PyIceberg for free. The tricky part is that the locking is often done at the catalog level. For example, updating a column in the case of a REST catalog is easy since it is being done by the catalog itself. When using a Hive catalog, you need to obtain a lock and update the Iceberg metadata yourself, which requires quite a bit of code and testing.If you then load a table:
Then PyIceberg will load the
metadata.json
from the object store. This table gives you a rich API to update the table using the newly introduced schema-evolution API. These are all metadata operations and don't really require a backend.If you then start a query:
Then PyIceberg will do the actual query planning, and start pruning the irrelevant files based on the partitions and metrics. A short intro on the subject is explained here. The
to_arrow()
will actually fetch the parquet files as well, and turn it into a PyArrow table.What I would recommend (but this is up to you of course), is to do the query planning in PyIceberg, and then scale out the actual heavy lifting using Daft. With PyIceberg you have the
plan_files
method that will return all the relevant Parquet files (one task refers to a single Parquet file, with possible positional/equality delete files) and this can then be read in a distributed fashion.I think there are also a few caveats that need to be taken care of:
What we did for Polars recently, and I think that's a great example:
Question by @jaychia:
The Java library is the reference implementation and contains the latest features.
Here we load the data into an Arrow table and return it to Polars. I don't think that's very applicable for Daft since you probably want to scale out this process. Hope this is some food for thought, happy to help and looking forward to continuing the discussion! 👍