From c96251d1a944ad58b7a3dedbbbe39c20fd7a8da5 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Thu, 21 Sep 2023 21:16:59 -0700 Subject: [PATCH 1/7] initial stubs --- daft/stubs/catalog/__init__.py | 0 daft/stubs/catalog/iceberg.py | 106 +++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 daft/stubs/catalog/__init__.py create mode 100644 daft/stubs/catalog/iceberg.py diff --git a/daft/stubs/catalog/__init__.py b/daft/stubs/catalog/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/daft/stubs/catalog/iceberg.py b/daft/stubs/catalog/iceberg.py new file mode 100644 index 0000000000..5cdcfc7ad7 --- /dev/null +++ b/daft/stubs/catalog/iceberg.py @@ -0,0 +1,106 @@ +# mypy: ignore-errors + +from __future__ import annotations + +from contextlib import AbstractContextManager + +import daft +from daft.daft import PartitionSpec +from daft.dataframe import DataFrame +from daft.logical.schema import Schema + + +class IcebergCatalog: + # creator functions + @classmethod + def from_glue(cls, args) -> IcebergCatalog: + ... + + def from_storage(cls, args) -> IcebergCatalog: + ... + + # namespace functions + def list_namespaces(self) -> DataFrame: + ... + + def create_namespace(self, namespace: str) -> bool: + ... + + def drop_namespace(self, namespace: str) -> bool: + ... + + # table functions + def list_tables(self) -> DataFrame: + ... + + def create_table( + self, + identifier: str, + schema: Schema, + partition_spec: PartitionSpec | None = None, + sort_columns: list[str] | None = None, + ) -> Table: + ... + + def load_table(self, identifier: str) -> Table: + ... + + +class Table: + def history(self) -> DataFrame: + ... + + def schema(self) -> Schema: + ... + + def schemas(self) -> dict[int, Schema]: + ... + + def partition_spec(self) -> PartitionSpec: + ... + + def partition_specs(self) -> dict[int, PartitionSpec]: + ... + + def update_schema(self) -> SchemaUpdate: + ... + + def update_partition_spec(self) -> PartitionSpecUpdate: + ... + + +class SchemaUpdate(AbstractContextManager): + ... + + +class PartitionSpecUpdate(AbstractContextManager): + ... + + +class DataframeIcebergNamespace: + def __init__(self, df: DataFrame) -> None: + self.df = df + + @classmethod + def read(cls, catalog: IcebergCatalog, table_identifier: str, snapshot_id: int) -> DataFrame: + ... + + def append(self, catalog: IcebergCatalog, table_identifier: str) -> None: + ... + + def overwrite(self, catalog: IcebergCatalog, table_identifier: str) -> None: + ... + + +def example_deleting_rows() -> None: + catalog = IcebergCatalog.from_glue("path/to/glue") + df: DataFrame = daft.read_iceberg(catalog, "my-table") + df = df.where(df["id"] > 10 & df["id"] < 20) + df.iceberg.overwrite(catalog, "my-table") + + +def example_updating_rows() -> None: + catalog = IcebergCatalog.from_glue("path/to/glue") + df: DataFrame = daft.read_iceberg(catalog, "my-table") + df = df.with_column("x", (df["x"] < 10).if_else(0, df["x"])) + df.iceberg.overwrite(catalog, "my-table") From 1d0c12642305b0939bb0b3db0e95c85cb3e89774 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Fri, 22 Sep 2023 13:26:20 -0700 Subject: [PATCH 2/7] add methods for updates --- daft/stubs/catalog/iceberg.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/daft/stubs/catalog/iceberg.py b/daft/stubs/catalog/iceberg.py index 5cdcfc7ad7..11f60db532 100644 --- a/daft/stubs/catalog/iceberg.py +++ b/daft/stubs/catalog/iceberg.py @@ -5,9 +5,9 @@ from contextlib import AbstractContextManager import daft -from daft.daft import PartitionSpec +from daft.daft import PartitionScheme, PartitionSpec from daft.dataframe import DataFrame -from daft.logical.schema import Schema +from daft.logical.schema import Field, Schema class IcebergCatalog: @@ -70,11 +70,19 @@ def update_partition_spec(self) -> PartitionSpecUpdate: class SchemaUpdate(AbstractContextManager): - ... + def add_column(self, field: Field) -> SchemaUpdate: + ... + + def drop_column(self, name: str) -> SchemaUpdate: + ... + + def rename_column(self, name: str, new_name: str) -> SchemaUpdate: + ... class PartitionSpecUpdate(AbstractContextManager): - ... + def add_partitioning_field(self, name: str, scheme: PartitionScheme) -> PartitionSpecUpdate: + ... class DataframeIcebergNamespace: From b892f4a60064853841e21d12d9e9f6a97b9d59dc Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Fri, 22 Sep 2023 13:58:25 -0700 Subject: [PATCH 3/7] add comments for namespace --- daft/stubs/catalog/iceberg.py | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/daft/stubs/catalog/iceberg.py b/daft/stubs/catalog/iceberg.py index 11f60db532..e24e36b2b7 100644 --- a/daft/stubs/catalog/iceberg.py +++ b/daft/stubs/catalog/iceberg.py @@ -90,13 +90,37 @@ def __init__(self, df: DataFrame) -> None: self.df = df @classmethod - def read(cls, catalog: IcebergCatalog, table_identifier: str, snapshot_id: int) -> DataFrame: + def read(cls, catalog: IcebergCatalog, table_identifier: str, snapshot_id: int | None = None) -> DataFrame: + """Produces a lazy daft DataFrame that is backed by an Iceberg table. + + Args: + catalog (IcebergCatalog): Iceberg catalog to read from + table_identifier (str): table name to read from + snapshot_id (Optional[int], optional): snapshot id of table to read from. Defaults to None, which is the latest snapshot. + + Returns: + DataFrame: a lazy daft dataframe that is backed by the input iceberg table. + """ ... def append(self, catalog: IcebergCatalog, table_identifier: str) -> None: + """Appends records from a daft DataFrame into an Iceberg table following it's Partitioning Spec. + This operation will not affect any of the existing records in the Iceberg Table. + + Args: + catalog (IcebergCatalog): Iceberg catalog to write to + table_identifier (str): table name to write to + """ ... def overwrite(self, catalog: IcebergCatalog, table_identifier: str) -> None: + """Overwrites the records in this Iceberg Table from a daft DataFrame. + This operation follows the Iceberg Table's Schema, Partitioning Scheme and properties when writing the new records. + + Args: + catalog (IcebergCatalog): Iceberg catalog to write to + table_identifier (str): table name to write to + """ ... From 53f3f36592b30d222fa1d51a7b9c9933c095ad58 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Fri, 22 Sep 2023 14:28:16 -0700 Subject: [PATCH 4/7] :s/overwrite/save --- daft/stubs/catalog/iceberg.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/daft/stubs/catalog/iceberg.py b/daft/stubs/catalog/iceberg.py index e24e36b2b7..cf75cced83 100644 --- a/daft/stubs/catalog/iceberg.py +++ b/daft/stubs/catalog/iceberg.py @@ -113,8 +113,8 @@ def append(self, catalog: IcebergCatalog, table_identifier: str) -> None: """ ... - def overwrite(self, catalog: IcebergCatalog, table_identifier: str) -> None: - """Overwrites the records in this Iceberg Table from a daft DataFrame. + def save(self, catalog: IcebergCatalog, table_identifier: str) -> None: + """saves the records in this Iceberg Table from a daft DataFrame. This operation follows the Iceberg Table's Schema, Partitioning Scheme and properties when writing the new records. Args: @@ -128,11 +128,11 @@ def example_deleting_rows() -> None: catalog = IcebergCatalog.from_glue("path/to/glue") df: DataFrame = daft.read_iceberg(catalog, "my-table") df = df.where(df["id"] > 10 & df["id"] < 20) - df.iceberg.overwrite(catalog, "my-table") + df.iceberg.save(catalog, "my-table") def example_updating_rows() -> None: catalog = IcebergCatalog.from_glue("path/to/glue") df: DataFrame = daft.read_iceberg(catalog, "my-table") df = df.with_column("x", (df["x"] < 10).if_else(0, df["x"])) - df.iceberg.overwrite(catalog, "my-table") + df.iceberg.save(catalog, "my-table") From c2ccd4363e6413cd6191e986e1040e870d9bcb0e Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Fri, 22 Sep 2023 14:33:14 -0700 Subject: [PATCH 5/7] append example --- daft/stubs/catalog/iceberg.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/daft/stubs/catalog/iceberg.py b/daft/stubs/catalog/iceberg.py index cf75cced83..e95071c519 100644 --- a/daft/stubs/catalog/iceberg.py +++ b/daft/stubs/catalog/iceberg.py @@ -136,3 +136,9 @@ def example_updating_rows() -> None: df: DataFrame = daft.read_iceberg(catalog, "my-table") df = df.with_column("x", (df["x"] < 10).if_else(0, df["x"])) df.iceberg.save(catalog, "my-table") + + +def example_appending_rows() -> None: + catalog = IcebergCatalog.from_glue("path/to/glue") + df: DataFrame = daft.read_parquet("s3://my-pq-file") + df.iceberg.append(catalog, "my-table") From 6654bdd32510a366a016d65870a8e18ee18af815 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 22 Sep 2023 18:13:31 -0700 Subject: [PATCH 6/7] Add comments --- daft/stubs/catalog/iceberg.py | 37 +++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/daft/stubs/catalog/iceberg.py b/daft/stubs/catalog/iceberg.py index e95071c519..faa08cf825 100644 --- a/daft/stubs/catalog/iceberg.py +++ b/daft/stubs/catalog/iceberg.py @@ -124,21 +124,46 @@ def save(self, catalog: IcebergCatalog, table_identifier: str) -> None: ... +################# +# EXAMPLE USAGE # +################# + + def example_deleting_rows() -> None: + """Delete rows from an Iceberg table""" + # 1. Grab an iceberg catalog from AWS Glue catalog = IcebergCatalog.from_glue("path/to/glue") - df: DataFrame = daft.read_iceberg(catalog, "my-table") + + # 2. Read dataframe from an Iceberg table in the catalog + df: DataFrame = daft.read_iceberg(catalog, "my-glue-database.my-table") + + # 3. Run filters on the dataframe itself df = df.where(df["id"] > 10 & df["id"] < 20) - df.iceberg.save(catalog, "my-table") + + # 4. Save the dataframe to your table + df.iceberg.save(catalog, "my-glue-database.my-table") def example_updating_rows() -> None: + """Update an Iceberg table""" + # 1. Grab an iceberg catalog from AWS Glue catalog = IcebergCatalog.from_glue("path/to/glue") - df: DataFrame = daft.read_iceberg(catalog, "my-table") + + # 2. Read dataframe from an Iceberg table in the catalog + df: DataFrame = daft.read_iceberg(catalog, "my-glue-database.my-table") + + # 3. Run updates on the dataframe itself df = df.with_column("x", (df["x"] < 10).if_else(0, df["x"])) - df.iceberg.save(catalog, "my-table") + + # 4. Save the dataframe to your table + df.iceberg.save(catalog, "my-glue-database.my-table") def example_appending_rows() -> None: + """Appending data to an Iceberg table""" + # 1. Load new data in a dataframe + new_data_df = daft.read_parquet("s3://my/new/data/**.pq") + + # 2. Append new data to the Iceberg Table catalog = IcebergCatalog.from_glue("path/to/glue") - df: DataFrame = daft.read_parquet("s3://my-pq-file") - df.iceberg.append(catalog, "my-table") + new_data_df.iceberg.append(catalog, "my-glue-database.my-table") From 0c8587839e633238ec92e352c19bfa18823d8881 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 22 Sep 2023 18:14:56 -0700 Subject: [PATCH 7/7] simplify examples --- daft/stubs/catalog/iceberg.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/daft/stubs/catalog/iceberg.py b/daft/stubs/catalog/iceberg.py index faa08cf825..340c4d4cc3 100644 --- a/daft/stubs/catalog/iceberg.py +++ b/daft/stubs/catalog/iceberg.py @@ -131,31 +131,27 @@ def save(self, catalog: IcebergCatalog, table_identifier: str) -> None: def example_deleting_rows() -> None: """Delete rows from an Iceberg table""" - # 1. Grab an iceberg catalog from AWS Glue + # 1. Read a dataframe from a iceberg table on AWS Glue catalog = IcebergCatalog.from_glue("path/to/glue") - - # 2. Read dataframe from an Iceberg table in the catalog df: DataFrame = daft.read_iceberg(catalog, "my-glue-database.my-table") - # 3. Run filters on the dataframe itself + # 2. Run filters on the dataframe itself df = df.where(df["id"] > 10 & df["id"] < 20) - # 4. Save the dataframe to your table + # 3. Save the dataframe back to your table df.iceberg.save(catalog, "my-glue-database.my-table") def example_updating_rows() -> None: """Update an Iceberg table""" - # 1. Grab an iceberg catalog from AWS Glue + # 1. Read a dataframe from a iceberg table on AWS Glue catalog = IcebergCatalog.from_glue("path/to/glue") - - # 2. Read dataframe from an Iceberg table in the catalog df: DataFrame = daft.read_iceberg(catalog, "my-glue-database.my-table") - # 3. Run updates on the dataframe itself + # 2. Run updates on the dataframe itself df = df.with_column("x", (df["x"] < 10).if_else(0, df["x"])) - # 4. Save the dataframe to your table + # 3. Save the dataframe back to your table df.iceberg.save(catalog, "my-glue-database.my-table")