Skip to content

Commit

Permalink
[FEAT] Daft Catalog API (#3036)
Browse files Browse the repository at this point in the history
Adds a `DaftCatalog` API to help cement Daft's catalog data access
patterns. Here is the intended UX:

```python

import daft

###
# Registering external catalogs
#
# We recognize `PyIceberg.Catalog`s and `UnityCatalog` for now
# TODO: This should be configurable via a Daft catalog config file (.toml or .yaml)
###

from pyiceberg.catalog import load_catalog

catalog = load_catalog(...)
daft.catalog.register_python_catalog(catalog)

###
# Adding named tables
###

df = daft.register_table(df, "foo")

###
# Reading tables
###

df1 = daft.read_table("foo")  # first priority is named tables

df2 = daft.read_table("x.y.z")  # next priority is the registered default catalog
df2 = daft.read_table("default.x.y.z")  # equivalent to the previous call

df3 = daft.read_table("my_other_catalog.x.y.z")  # Supports named catalogs other than default one
```

Other APIs which will be nice as follow-ons:

- [ ] Integrate this with the SQLCatalog API that our SQL stuff uses
- [ ] Detection of catalog from a YAML `~/.daft.yaml` config file
- [ ] Allow for configuring table access (e.g.
`daft.read_table("iceberg_table",
options=daft.catalog.IcebergReadOptions(...))`)
- [ ] Implementations for other catalogs that isn't a Python catalog,
and can support other table types (e.g. Hive and Delta):
    - [ ] `daft.catalog.register_aws_glue()`
    - [ ] `daft.catalog.register_hive_metastore()`
- [ ] `df.write_table("table_name", mode="overwrite|append",
create_if_missing=True)`
- [ ] `df.upsert("table_name", match_columns={...},
update_columns={...}, insert_columns={...})`
- [ ] DDL: allow for easy creation of tables, erroring out if the
selected backend does not support a given table format
    - [ ] `daft.catalog.create_table_parquet(...)`
    - [ ] `daft.catalog.create_table_iceberg(...)`
    - [ ] `daft.catalog.create_table_deltalake(...)`
    - [ ] `daft.catalog.list_tables(...)`

---------

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Nov 17, 2024
1 parent 84db665 commit bb506a4
Show file tree
Hide file tree
Showing 25 changed files with 1,021 additions and 79 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ common-scan-info = {path = "src/common/scan-info", default-features = false}
common-system-info = {path = "src/common/system-info", default-features = false}
common-tracing = {path = "src/common/tracing", default-features = false}
common-version = {path = "src/common/version", default-features = false}
daft-catalog = {path = "src/daft-catalog", default-features = false}
daft-catalog-python-catalog = {path = "src/daft-catalog/python-catalog", optional = true}
daft-compression = {path = "src/daft-compression", default-features = false}
daft-connect = {path = "src/daft-connect", optional = true}
daft-core = {path = "src/daft-core", default-features = false}
Expand Down Expand Up @@ -52,6 +54,8 @@ python = [
"common-display/python",
"common-resource-request/python",
"common-system-info/python",
"daft-catalog/python",
"daft-catalog-python-catalog/python",
"daft-connect/python",
"daft-core/python",
"daft-csv/python",
Expand All @@ -75,6 +79,11 @@ python = [
"daft-writers/python",
"daft-table/python",
"dep:daft-connect",
"common-daft-config/python",
"common-system-info/python",
"common-display/python",
"common-resource-request/python",
"dep:daft-catalog-python-catalog",
"dep:pyo3",
"dep:pyo3-log"
]
Expand Down Expand Up @@ -140,6 +149,7 @@ members = [
"src/common/scan-info",
"src/common/system-info",
"src/common/treenode",
"src/daft-catalog",
"src/daft-core",
"src/daft-csv",
"src/daft-dsl",
Expand Down
3 changes: 3 additions & 0 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def refresh_logger() -> None:
# Daft top-level imports
###

from daft.catalog import read_table, register_table
from daft.context import set_execution_config, set_planning_config, execution_config_ctx, planning_config_ctx
from daft.convert import (
from_arrow,
Expand Down Expand Up @@ -129,6 +130,8 @@ def refresh_logger() -> None:
"set_execution_config",
"planning_config_ctx",
"execution_config_ctx",
"read_table",
"register_table",
"sql",
"sql_expr",
"to_struct",
Expand Down
151 changes: 151 additions & 0 deletions daft/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""The `daft.catalog` module contains functionality for Data Catalogs.
A Data Catalog can be understood as a system/service for users to discover, access and query their data.
Most commonly, users' data is represented as a "table". Some more modern Data Catalogs such as Unity Catalog
also expose other types of data including files, ML models, registered functions and more.
Examples of Data Catalogs include AWS Glue, Hive Metastore, Apache Iceberg REST and Unity Catalog.
Daft manages Data Catalogs by registering them in an internal meta-catalog, called the "DaftMetaCatalog". This
is simple a collection of data catalogs, which Daft will attempt to detect from a users' current environment.
**Data Catalog**
Daft recognizes a default catalog which it will attempt to use when no specific catalog name is provided.
```python
# This will hit the default catalog
daft.read_table("my_db.my_namespace.my_table")
```
**Named Tables**
Daft allows also the registration of named tables, which have no catalog associated with them.
Note that named tables take precedence over the default catalog's table names when resolving names.
```python
df = daft.from_pydict({"foo": [1, 2, 3]})
daft.catalog.register_named_table(
"my_table",
df,
)
# Your table is now accessible from Daft-SQL, or Daft's `read_table`
df1 = daft.read_table("my_table")
df2 = daft.sql("SELECT * FROM my_table")
```
"""

from __future__ import annotations

from daft.daft import catalog as native_catalog
from daft.logical.builder import LogicalPlanBuilder

from daft.dataframe import DataFrame

_PYICEBERG_AVAILABLE = False
try:
from pyiceberg.catalog import Catalog as PyIcebergCatalog

_PYICEBERG_AVAILABLE = True
except ImportError:
pass

_UNITY_AVAILABLE = False
try:
from daft.unity_catalog import UnityCatalog

_UNITY_AVAILABLE = True
except ImportError:
pass

__all__ = [
"read_table",
"register_python_catalog",
"unregister_catalog",
"register_table",
]

# Forward imports from the native catalog which don't require Python wrappers
unregister_catalog = native_catalog.unregister_catalog


def read_table(name: str) -> DataFrame:
"""Finds a table with the specified name and reads it as a DataFrame
The provided name can be any of the following, and Daft will return them with the following order of priority:
1. Name of a registered dataframe/SQL view (manually registered using `daft.register_table`): `"my_registered_table"`
2. Name of a table within the default catalog (without inputting the catalog name) for example: `"my.table.name"`
3. Name of a fully-qualified table path with the catalog name for example: `"my_catalog.my.table.name"`
Args:
name: The identifier for the table to read
Returns:
A DataFrame containing the data from the specified table.
"""
native_logical_plan_builder = native_catalog.read_table(name)
return DataFrame(LogicalPlanBuilder(native_logical_plan_builder))


def register_table(name: str, dataframe: DataFrame) -> str:
"""Register a DataFrame as a named table.
This function registers a DataFrame as a named table, making it accessible
via Daft-SQL or Daft's `read_table` function.
Args:
name (str): The name to register the table under.
dataframe (daft.DataFrame): The DataFrame to register as a table.
Returns:
str: The name of the registered table.
Example:
>>> df = daft.from_pydict({"foo": [1, 2, 3]})
>>> daft.catalog.register_table("my_table", df)
>>> daft.read_table("my_table")
"""
return native_catalog.register_table(name, dataframe._builder._builder)


def register_python_catalog(catalog: PyIcebergCatalog | UnityCatalog, name: str | None = None) -> str:
"""Registers a Python catalog with Daft
Currently supports:
* [PyIceberg Catalogs](https://py.iceberg.apache.org/api/)
* [Unity Catalog](https://www.getdaft.io/projects/docs/en/latest/user_guide/integrations/unity-catalog.html)
Args:
catalog (PyIcebergCatalog | UnityCatalog): The Python catalog to register.
name (str | None, optional): The name to register the catalog under. If None, this catalog is registered as the default catalog.
Returns:
str: The name of the registered catalog.
Raises:
ValueError: If an unsupported catalog type is provided.
Example:
>>> from pyiceberg.catalog import load_catalog
>>> catalog = load_catalog("my_catalog")
>>> daft.catalog.register_python_catalog(catalog, "my_daft_catalog")
"""
python_catalog: PyIcebergCatalog
if _PYICEBERG_AVAILABLE and isinstance(catalog, PyIcebergCatalog):
from daft.catalog.pyiceberg import PyIcebergCatalogAdaptor

python_catalog = PyIcebergCatalogAdaptor(catalog)
elif _UNITY_AVAILABLE and isinstance(catalog, UnityCatalog):
from daft.catalog.unity import UnityCatalogAdaptor

python_catalog = UnityCatalogAdaptor(catalog)
else:
raise ValueError(f"Unsupported catalog type: {type(catalog)}")

return native_catalog.register_python_catalog(python_catalog, name)
32 changes: 32 additions & 0 deletions daft/catalog/pyiceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pyiceberg.catalog import Catalog as PyIcebergCatalog
from pyiceberg.table import Table as PyIcebergTable

from daft.dataframe import DataFrame

from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable


class PyIcebergCatalogAdaptor(PythonCatalog):
def __init__(self, pyiceberg_catalog: PyIcebergCatalog):
self._catalog = pyiceberg_catalog

def list_tables(self, prefix: str) -> list[str]:
return [".".join(tup) for tup in self._catalog.list_tables(prefix)]

def load_table(self, name: str) -> PyIcebergTableAdaptor:
return PyIcebergTableAdaptor(self._catalog.load_table(name))


class PyIcebergTableAdaptor(PythonCatalogTable):
def __init__(self, pyiceberg_table: PyIcebergTable):
self._table = pyiceberg_table

def to_dataframe(self) -> DataFrame:
import daft

return daft.read_iceberg(self._table)
24 changes: 24 additions & 0 deletions daft/catalog/python_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

from abc import abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from daft.dataframe import DataFrame


class PythonCatalog:
"""Wrapper class for various Python implementations of Data Catalogs"""

@abstractmethod
def list_tables(self, prefix: str) -> list[str]: ...

@abstractmethod
def load_table(self, name: str) -> PythonCatalogTable: ...


class PythonCatalogTable:
"""Wrapper class for various Python implementations of Data Catalog Tables"""

@abstractmethod
def to_dataframe(self) -> DataFrame: ...
49 changes: 49 additions & 0 deletions daft/catalog/unity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
from daft.dataframe import DataFrame
from daft.unity_catalog import UnityCatalog, UnityCatalogTable

from daft.catalog.python_catalog import PythonCatalog, PythonCatalogTable


class UnityCatalogAdaptor(PythonCatalog):
def __init__(self, unity_catalog: UnityCatalog):
self._catalog = unity_catalog

def list_tables(self, prefix: str) -> list[str]:
num_namespaces = prefix.count(".")
if prefix == "":
return [
tbl
for cat in self._catalog.list_catalogs()
for schema in self._catalog.list_schemas(cat)
for tbl in self._catalog.list_tables(schema)
]
elif num_namespaces == 0:
catalog_name = prefix
return [
tbl for schema in self._catalog.list_schemas(catalog_name) for tbl in self._catalog.list_tables(schema)
]
elif num_namespaces == 1:
schema_name = prefix
return [tbl for tbl in self._catalog.list_tables(schema_name)]
else:
raise ValueError(
f"Unrecognized catalog name or schema name, expected a '.'-separated namespace but received: {prefix}"
)

def load_table(self, name: str) -> UnityTableAdaptor:
return UnityTableAdaptor(self._catalog.load_table(name))


class UnityTableAdaptor(PythonCatalogTable):
def __init__(self, unity_table: UnityCatalogTable):
self._table = unity_table

def to_dataframe(self) -> DataFrame:
import daft

return daft.read_deltalake(self._table)
11 changes: 11 additions & 0 deletions daft/daft/catalog.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from typing import TYPE_CHECKING

from daft.daft import LogicalPlanBuilder as PyLogicalPlanBuilder

if TYPE_CHECKING:
from daft.catalog.python_catalog import PythonCatalog

def read_table(name: str) -> PyLogicalPlanBuilder: ...
def register_table(name: str, plan_builder: PyLogicalPlanBuilder) -> str: ...
def register_python_catalog(catalog: PythonCatalog, catalog_name: str | None) -> str: ...
def unregister_catalog(catalog_name: str | None) -> bool: ...
9 changes: 9 additions & 0 deletions daft/io/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ class DataCatalogTable:
table_name: str
catalog_id: Optional[str] = None

def __post_init__(self):
import warnings

warnings.warn(
"This API will soon be deprecated. Users should use the new functionality in daft.catalog.",
DeprecationWarning,
stacklevel=2,
)

def table_uri(self, io_config: IOConfig) -> str:
"""
Get the URI of the table in the data catalog.
Expand Down
2 changes: 2 additions & 0 deletions src/common/error/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub enum DaftError {
FromUtf8Error(#[from] std::string::FromUtf8Error),
#[error("Not Yet Implemented: {0}")]
NotImplemented(String),
#[error("DaftError::CatalogError {0}")]
CatalogError(String),
}

impl DaftError {
Expand Down
Loading

0 comments on commit bb506a4

Please sign in to comment.