Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/vicky1999/Daft into vicky1999/
Browse files Browse the repository at this point in the history
#2769-list-length
  • Loading branch information
vicky1999 committed Oct 5, 2024
2 parents eb659e1 + f4d1da2 commit 24ce259
Show file tree
Hide file tree
Showing 97 changed files with 4,050 additions and 1,763 deletions.
1 change: 1 addition & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
d5e444d0a71409ae3701d4249ad877f1fb9e2235 # introduced `rustfmt.toml` and ran formatter; ignoring large formatting changes
45e2944e252ccdd563dc20edd9b29762e05cec1d # auto-fix prefer `Self` over explicit type
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ jobs:
channel: stable

- name: Install Machete
run: cargo +stable install cargo-machete
run: cargo +stable install cargo-machete@0.7.0 --locked
- name: Run Machete
run: cargo machete --with-metadata

Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ parquet2 = {path = "src/parquet2"}
debug = true

[profile.dev]
debug = "line-tables-only"
overflow-checks = false

[profile.dev.build-override]
opt-level = 3

[profile.dev-bench]
codegen-units = 16
debug = 1 # include symbols
Expand Down
21 changes: 18 additions & 3 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ from daft.io.scan import ScanOperator
from daft.plan_scheduler.physical_plan_scheduler import PartitionT
from daft.runners.partitioning import PartitionCacheEntry
from daft.sql.sql_connection import SQLConnection
from daft.udf import PartialStatefulUDF, PartialStatelessUDF
from daft.udf import InitArgsType, PartialStatefulUDF, PartialStatelessUDF

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -1150,12 +1150,14 @@ def stateful_udf(
expressions: list[PyExpr],
return_dtype: PyDataType,
resource_request: ResourceRequest | None,
init_args: tuple[tuple[Any, ...], dict[str, Any]] | None,
init_args: InitArgsType,
batch_size: int | None,
concurrency: int | None,
) -> PyExpr: ...
def check_column_name_validity(name: str, schema: PySchema): ...
def extract_partial_stateful_udf_py(expression: PyExpr) -> dict[str, PartialStatefulUDF]: ...
def extract_partial_stateful_udf_py(
expression: PyExpr,
) -> dict[str, tuple[PartialStatefulUDF, InitArgsType]]: ...
def bind_stateful_udfs(expression: PyExpr, initialized_funcs: dict[str, Callable]) -> PyExpr: ...
def resolve_expr(expr: PyExpr, schema: PySchema) -> tuple[PyExpr, PyField]: ...
def hash(expr: PyExpr, seed: Any | None = None) -> PyExpr: ...
Expand Down Expand Up @@ -1195,8 +1197,21 @@ def minhash(
ngram_size: int,
seed: int = 1,
) -> PyExpr: ...

# -----
# SQL functions
# -----
class SQLFunctionStub:
@property
def name(self) -> str: ...
@property
def docstring(self) -> str: ...
@property
def arg_names(self) -> list[str]: ...

def sql(sql: str, catalog: PyCatalog, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def sql_expr(sql: str) -> PyExpr: ...
def list_sql_functions() -> list[SQLFunctionStub]: ...
def utf8_count_matches(expr: PyExpr, patterns: PyExpr, whole_words: bool, case_sensitive: bool) -> PyExpr: ...
def to_struct(inputs: list[PyExpr]) -> PyExpr: ...

Expand Down
7 changes: 6 additions & 1 deletion daft/iceberg/iceberg_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import TYPE_CHECKING, Any, Iterator, List, Tuple

from daft import Expression, col
from daft.datatype import DataType
from daft.io.common import _get_schema_from_dict
from daft.table import MicroPartition
from daft.table.partitioning import PartitionedTable, partition_strings_to_path

Expand Down Expand Up @@ -211,7 +213,10 @@ def visitor(self, partition_record: "IcebergRecord") -> "IcebergWriteVisitors.Fi
return self.FileVisitor(self, partition_record)

def to_metadata(self) -> MicroPartition:
return MicroPartition.from_pydict({"data_file": self.data_files})
col_name = "data_file"
if len(self.data_files) == 0:
return MicroPartition.empty(_get_schema_from_dict({col_name: DataType.python()}))
return MicroPartition.from_pydict({col_name: self.data_files})


def partitioned_table_to_iceberg_iter(
Expand Down
13 changes: 9 additions & 4 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ def initialize_actor_global_state(uninitialized_projection: ExpressionsProjectio

logger.info("Initializing stateful UDFs: %s", ", ".join(partial_stateful_udfs.keys()))

# TODO: Account for Stateful Actor initialization arguments as well as user-provided batch_size
PyActorPool.initialized_stateful_udfs_process_singleton = {
name: partial_udf.func_cls() for name, partial_udf in partial_stateful_udfs.items()
}
PyActorPool.initialized_stateful_udfs_process_singleton = {}
for name, (partial_udf, init_args) in partial_stateful_udfs.items():
if init_args is None:
PyActorPool.initialized_stateful_udfs_process_singleton[name] = partial_udf.func_cls()
else:
args, kwargs = init_args
PyActorPool.initialized_stateful_udfs_process_singleton[name] = partial_udf.func_cls(
*args, **kwargs
)

@staticmethod
def build_partitions_with_stateful_project(
Expand Down
11 changes: 8 additions & 3 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,14 @@ def __init__(self, daft_execution_config: PyDaftExecutionConfig, uninitialized_p
for name, psu in extract_partial_stateful_udf_py(expr._expr).items()
}
logger.info("Initializing stateful UDFs: %s", ", ".join(partial_stateful_udfs.keys()))
self.initialized_stateful_udfs = {
name: partial_udf.func_cls() for name, partial_udf in partial_stateful_udfs.items()
}

self.initialized_stateful_udfs = {}
for name, (partial_udf, init_args) in partial_stateful_udfs.items():
if init_args is None:
self.initialized_stateful_udfs[name] = partial_udf.func_cls()
else:
args, kwargs = init_args
self.initialized_stateful_udfs[name] = partial_udf.func_cls(*args, **kwargs)

@ray.method(num_returns=2)
def run(
Expand Down
30 changes: 30 additions & 0 deletions daft/sql/_sql_funcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""This module is used for Sphinx documentation only. We procedurally generate Python functions to allow
Sphinx to generate documentation pages for every SQL function.
"""

from __future__ import annotations

from inspect import Parameter as _Parameter
from inspect import Signature as _Signature

from daft.daft import list_sql_functions as _list_sql_functions


def _create_sql_function(func_name: str, docstring: str, arg_names: list[str]):
def sql_function(*args, **kwargs):
raise NotImplementedError("This function is for documentation purposes only and should not be called.")

sql_function.__name__ = func_name
sql_function.__qualname__ = func_name
sql_function.__doc__ = docstring
sql_function.__signature__ = _Signature([_Parameter(name, _Parameter.POSITIONAL_OR_KEYWORD) for name in arg_names]) # type: ignore[attr-defined]

# Register the function in the current module
globals()[func_name] = sql_function


__all__ = []

for sql_function_stub in _list_sql_functions():
_create_sql_function(sql_function_stub.name, sql_function_stub.docstring, sql_function_stub.arg_names)
__all__.append(sql_function_stub.name)
118 changes: 108 additions & 10 deletions daft/sql/sql.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# isort: dont-add-import: from __future__ import annotations

import inspect
from typing import Optional, overload
from typing import Optional

from daft.api_annotations import PublicAPI
from daft.context import get_context
Expand Down Expand Up @@ -38,22 +38,120 @@ def _copy_from(self, other: "SQLCatalog") -> None:

@PublicAPI
def sql_expr(sql: str) -> Expression:
return Expression._from_pyexpr(_sql_expr(sql))

"""Parses a SQL string into a Daft Expression
@overload
def sql(sql: str) -> DataFrame: ...
This function allows you to create Daft Expressions from SQL snippets, which can then be used
in Daft operations or combined with other Daft Expressions.
Args:
sql (str): A SQL string to be parsed into a Daft Expression.
@overload
def sql(sql: str, catalog: SQLCatalog, register_globals: bool = ...) -> DataFrame: ...
Returns:
Expression: A Daft Expression representing the parsed SQL.
Examples:
Create a simple SQL expression:
>>> import daft
>>> expr = daft.sql_expr("1 + 2")
>>> print(expr)
lit(1) + lit(2)
Use SQL expression in a Daft DataFrame operation:
>>> df = daft.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
>>> df = df.with_column("c", daft.sql_expr("a + b"))
>>> df.show()
╭───────┬───────┬───────╮
│ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- │
│ Int64 ┆ Int64 ┆ Int64 │
╞═══════╪═══════╪═══════╡
│ 1 ┆ 4 ┆ 5 │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2 ┆ 5 ┆ 7 │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3 ┆ 6 ┆ 9 │
╰───────┴───────┴───────╯
<BLANKLINE>
(Showing first 3 of 3 rows)
`daft.sql_expr` is also called automatically for you in some DataFrame operations such as filters:
>>> df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
>>> result = df.where("x < 3 AND y > 4")
>>> result.show()
╭───────┬───────╮
│ x ┆ y │
│ --- ┆ --- │
│ Int64 ┆ Int64 │
╞═══════╪═══════╡
│ 2 ┆ 5 │
╰───────┴───────╯
<BLANKLINE>
(Showing first 1 of 1 rows)
"""
return Expression._from_pyexpr(_sql_expr(sql))


@PublicAPI
def sql(sql: str, catalog: Optional[SQLCatalog] = None, register_globals: bool = True) -> DataFrame:
"""Create a DataFrame from an SQL query.
EXPERIMENTAL: This features is early in development and will change.
"""Run a SQL query, returning the results as a DataFrame
.. WARNING::
This features is early in development and will likely experience API changes.
Examples:
A simple example joining 2 dataframes together using a SQL statement, relying on Daft to detect the names of
SQL tables using their corresponding Python variable names.
>>> import daft
>>>
>>> df1 = daft.from_pydict({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
>>> df2 = daft.from_pydict({"a": [1, 2, 3], "c": ["daft", None, None]})
>>>
>>> # Daft automatically detects `df1` and `df2` from your Python global namespace
>>> result_df = daft.sql("SELECT * FROM df1 JOIN df2 ON df1.a = df2.a")
>>> result_df.show()
╭───────┬──────┬──────╮
│ a ┆ b ┆ c │
│ --- ┆ --- ┆ --- │
│ Int64 ┆ Utf8 ┆ Utf8 │
╞═══════╪══════╪══════╡
│ 1 ┆ foo ┆ daft │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2 ┆ bar ┆ None │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 3 ┆ baz ┆ None │
╰───────┴──────┴──────╯
<BLANKLINE>
(Showing first 3 of 3 rows)
A more complex example using a SQLCatalog to create a named table called `"my_table"`, which can then be referenced from inside your SQL statement.
>>> import daft
>>> from daft.sql import SQLCatalog
>>>
>>> df = daft.from_pydict({"a": [1, 2, 3], "b": ["foo", "bar", "baz"]})
>>>
>>> # Register dataframes as tables in SQL explicitly with names
>>> catalog = SQLCatalog({"my_table": df})
>>>
>>> daft.sql("SELECT a FROM my_table", catalog=catalog).show()
╭───────╮
│ a │
│ --- │
│ Int64 │
╞═══════╡
│ 1 │
├╌╌╌╌╌╌╌┤
│ 2 │
├╌╌╌╌╌╌╌┤
│ 3 │
╰───────╯
<BLANKLINE>
(Showing first 3 of 3 rows)
Args:
sql (str): SQL query to execute
Expand Down
22 changes: 15 additions & 7 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
PythonStorageConfig,
StorageConfig,
)
from daft.datatype import DataType
from daft.dependencies import pa, pacsv, pads, pajson, pq
from daft.expressions import ExpressionsProjection, col
from daft.filesystem import (
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.io.common import _get_schema_from_dict
from daft.logical.schema import Schema
from daft.runners.partitioning import (
TableParseCSVOptions,
Expand Down Expand Up @@ -426,16 +428,22 @@ def __call__(self, written_file):
self.parent.paths.append(written_file.path)
self.parent.partition_indices.append(self.idx)

def __init__(self, partition_values: MicroPartition | None, path_key: str = "path"):
def __init__(self, partition_values: MicroPartition | None, schema: Schema):
self.paths: list[str] = []
self.partition_indices: list[int] = []
self.partition_values = partition_values
self.path_key = path_key
self.path_key = schema.column_names()[
0
] # I kept this from our original code, but idk why it's the first column name -kevin
self.schema = schema

def visitor(self, partition_idx: int) -> TabularWriteVisitors.FileVisitor:
return self.FileVisitor(self, partition_idx)

def to_metadata(self) -> MicroPartition:
if len(self.paths) == 0:
return MicroPartition.empty(self.schema)

metadata: dict[str, Any] = {self.path_key: self.paths}

if self.partition_values:
Expand Down Expand Up @@ -488,10 +496,7 @@ def write_tabular(

partitioned = PartitionedTable(table, partition_cols)

# I kept this from our original code, but idk why it's the first column name -kevin
path_key = schema.column_names()[0]

visitors = TabularWriteVisitors(partitioned.partition_values(), path_key)
visitors = TabularWriteVisitors(partitioned.partition_values(), schema)

for i, (part_table, part_path) in enumerate(partitioned_table_to_hive_iter(partitioned, resolved_path)):
size_bytes = part_table.nbytes
Expand Down Expand Up @@ -686,7 +691,10 @@ def visitor(self, partition_values: dict[str, str | None]) -> DeltaLakeWriteVisi
return self.FileVisitor(self, partition_values)

def to_metadata(self) -> MicroPartition:
return MicroPartition.from_pydict({"add_action": self.add_actions})
col_name = "add_action"
if len(self.add_actions) == 0:
return MicroPartition.empty(_get_schema_from_dict({col_name: DataType.python()}))
return MicroPartition.from_pydict({col_name: self.add_actions})


def write_deltalake(
Expand Down
5 changes: 3 additions & 2 deletions daft/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import functools
import inspect
from abc import abstractmethod
from typing import Any, Callable, Union
from typing import Any, Callable, Dict, Optional, Tuple, Union

from daft.context import get_context
from daft.daft import PyDataType, ResourceRequest
Expand All @@ -13,6 +13,7 @@
from daft.expressions import Expression
from daft.series import PySeries, Series

InitArgsType = Optional[Tuple[Tuple[Any, ...], Dict[str, Any]]]
UserProvidedPythonFunction = Callable[..., Union[Series, "np.ndarray", list]]


Expand Down Expand Up @@ -294,7 +295,7 @@ class StatefulUDF(UDF):
name: str
cls: type
return_dtype: DataType
init_args: tuple[tuple[Any, ...], dict[str, Any]] | None = None
init_args: InitArgsType = None
concurrency: int | None = None

def __post_init__(self):
Expand Down
Loading

0 comments on commit 24ce259

Please sign in to comment.