Skip to content

Commit

Permalink
Merge branch 'Eventual-Inc:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
sagiahrac authored Oct 7, 2024
2 parents 499f935 + 396c004 commit 21154e7
Show file tree
Hide file tree
Showing 127 changed files with 5,331 additions and 1,835 deletions.
1 change: 1 addition & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
d5e444d0a71409ae3701d4249ad877f1fb9e2235 # introduced `rustfmt.toml` and ran formatter; ignoring large formatting changes
45e2944e252ccdd563dc20edd9b29762e05cec1d # auto-fix prefer `Self` over explicit type
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ jobs:
channel: stable

- name: Install Machete
run: cargo +stable install cargo-machete
run: cargo +stable install cargo-machete@0.7.0 --locked
- name: Run Machete
run: cargo machete --with-metadata

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ parquet2 = {path = "src/parquet2"}
debug = true

[profile.dev]
debug = "line-tables-only"
overflow-checks = false

[profile.dev.build-override]
opt-level = 3

[profile.dev-bench]
codegen-units = 16
debug = 1 # include symbols
Expand Down
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

`Website <https://www.getdaft.io>`_ • `Docs <https://www.getdaft.io/projects/docs/>`_ • `Installation`_ • `10-minute tour of Daft <https://www.getdaft.io/projects/docs/en/latest/learn/10-min.html>`_ • `Community and Support <https://github.com/Eventual-Inc/Daft/discussions>`_

Daft: Distributed dataframes for multimodal data
=======================================================
Daft: Unified Engine for Data Analytics, Engineering & ML/AI
============================================================


`Daft <https://www.getdaft.io>`_ is a distributed query engine for large-scale data processing in Python and is implemented in Rust.
`Daft <https://www.getdaft.io>`_ is a distributed query engine for large-scale data processing using Python or SQL, implemented in Rust.

* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration
* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration, or SQL for analytical queries
* **Focus on the what:** Powerful Query Optimizer that rewrites queries to be as efficient as possible
* **Data Catalog integrations:** Full integration with data catalogs such as Apache Iceberg
* **Rich multimodal type-system:** Supports multimodal types such as Images, URLs, Tensors and more
Expand Down Expand Up @@ -51,7 +51,7 @@ Quickstart

In this example, we load images from an AWS S3 bucket's URLs and resize each image in the dataframe:

.. code::
.. code:: python
import daft
Expand Down
21 changes: 18 additions & 3 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ from daft.io.scan import ScanOperator
from daft.plan_scheduler.physical_plan_scheduler import PartitionT
from daft.runners.partitioning import PartitionCacheEntry
from daft.sql.sql_connection import SQLConnection
from daft.udf import PartialStatefulUDF, PartialStatelessUDF
from daft.udf import InitArgsType, PartialStatefulUDF, PartialStatelessUDF

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -1150,12 +1150,14 @@ def stateful_udf(
expressions: list[PyExpr],
return_dtype: PyDataType,
resource_request: ResourceRequest | None,
init_args: tuple[tuple[Any, ...], dict[str, Any]] | None,
init_args: InitArgsType,
batch_size: int | None,
concurrency: int | None,
) -> PyExpr: ...
def check_column_name_validity(name: str, schema: PySchema): ...
def extract_partial_stateful_udf_py(expression: PyExpr) -> dict[str, PartialStatefulUDF]: ...
def extract_partial_stateful_udf_py(
expression: PyExpr,
) -> dict[str, tuple[PartialStatefulUDF, InitArgsType]]: ...
def bind_stateful_udfs(expression: PyExpr, initialized_funcs: dict[str, Callable]) -> PyExpr: ...
def resolve_expr(expr: PyExpr, schema: PySchema) -> tuple[PyExpr, PyField]: ...
def hash(expr: PyExpr, seed: Any | None = None) -> PyExpr: ...
Expand Down Expand Up @@ -1195,8 +1197,21 @@ def minhash(
ngram_size: int,
seed: int = 1,
) -> PyExpr: ...

# -----
# SQL functions
# -----
class SQLFunctionStub:
@property
def name(self) -> str: ...
@property
def docstring(self) -> str: ...
@property
def arg_names(self) -> list[str]: ...

def sql(sql: str, catalog: PyCatalog, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def sql_expr(sql: str) -> PyExpr: ...
def list_sql_functions() -> list[SQLFunctionStub]: ...
def utf8_count_matches(expr: PyExpr, patterns: PyExpr, whole_words: bool, case_sensitive: bool) -> PyExpr: ...
def to_struct(inputs: list[PyExpr]) -> PyExpr: ...

Expand Down
1 change: 1 addition & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def actor_pool_project(
with get_context().runner().actor_pool_context(
actor_pool_name,
actor_resource_request,
task_resource_request,
num_actors,
projection,
) as actor_pool_id:
Expand Down
16 changes: 16 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import math
import os
import warnings
from datetime import date, datetime, time
from decimal import Decimal
from typing import (
Expand Down Expand Up @@ -2936,6 +2937,21 @@ def count(self, mode: CountMode = CountMode.Valid) -> Expression:
def lengths(self) -> Expression:
"""Gets the length of each list
(DEPRECATED) Please use Expression.list.length instead
Returns:
Expression: a UInt64 expression which is the length of each list
"""
warnings.warn(
"This function will be deprecated from Daft version >= 0.3.5! Instead, please use 'Expression.list.length'",
category=DeprecationWarning,
)

return Expression._from_pyexpr(native.list_count(self._expr, CountMode.All))

def length(self) -> Expression:
"""Gets the length of each list
Returns:
Expression: a UInt64 expression which is the length of each list
"""
Expand Down
7 changes: 6 additions & 1 deletion daft/iceberg/iceberg_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from typing import TYPE_CHECKING, Any, Iterator, List, Tuple

from daft import Expression, col
from daft.datatype import DataType
from daft.io.common import _get_schema_from_dict
from daft.table import MicroPartition
from daft.table.partitioning import PartitionedTable, partition_strings_to_path

Expand Down Expand Up @@ -211,7 +213,10 @@ def visitor(self, partition_record: "IcebergRecord") -> "IcebergWriteVisitors.Fi
return self.FileVisitor(self, partition_record)

def to_metadata(self) -> MicroPartition:
return MicroPartition.from_pydict({"data_file": self.data_files})
col_name = "data_file"
if len(self.data_files) == 0:
return MicroPartition.empty(_get_schema_from_dict({col_name: DataType.python()}))
return MicroPartition.from_pydict({col_name: self.data_files})


def partitioned_table_to_iceberg_iter(
Expand Down
28 changes: 20 additions & 8 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,15 @@ def initialize_actor_global_state(uninitialized_projection: ExpressionsProjectio

logger.info("Initializing stateful UDFs: %s", ", ".join(partial_stateful_udfs.keys()))

# TODO: Account for Stateful Actor initialization arguments as well as user-provided batch_size
PyActorPool.initialized_stateful_udfs_process_singleton = {
name: partial_udf.func_cls() for name, partial_udf in partial_stateful_udfs.items()
}
PyActorPool.initialized_stateful_udfs_process_singleton = {}
for name, (partial_udf, init_args) in partial_stateful_udfs.items():
if init_args is None:
PyActorPool.initialized_stateful_udfs_process_singleton[name] = partial_udf.func_cls()
else:
args, kwargs = init_args
PyActorPool.initialized_stateful_udfs_process_singleton[name] = partial_udf.func_cls(
*args, **kwargs
)

@staticmethod
def build_partitions_with_stateful_project(
Expand Down Expand Up @@ -332,20 +337,27 @@ def run_iter_tables(

@contextlib.contextmanager
def actor_pool_context(
self, name: str, resource_request: ResourceRequest, num_actors: int, projection: ExpressionsProjection
self,
name: str,
actor_resource_request: ResourceRequest,
_task_resource_request: ResourceRequest,
num_actors: int,
projection: ExpressionsProjection,
) -> Iterator[str]:
actor_pool_id = f"py_actor_pool-{name}"

total_resource_request = resource_request * num_actors
total_resource_request = actor_resource_request * num_actors
admitted = self._attempt_admit_task(total_resource_request)

if not admitted:
raise RuntimeError(
f"Not enough resources available to admit {num_actors} actors, each with resource request: {resource_request}"
f"Not enough resources available to admit {num_actors} actors, each with resource request: {actor_resource_request}"
)

try:
self._actor_pools[actor_pool_id] = PyActorPool(actor_pool_id, num_actors, resource_request, projection)
self._actor_pools[actor_pool_id] = PyActorPool(
actor_pool_id, num_actors, actor_resource_request, projection
)
self._actor_pools[actor_pool_id].setup()
logger.debug("Created actor pool %s with resources: %s", actor_pool_id, total_resource_request)
yield actor_pool_id
Expand Down
27 changes: 22 additions & 5 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,14 @@ def __init__(self, daft_execution_config: PyDaftExecutionConfig, uninitialized_p
for name, psu in extract_partial_stateful_udf_py(expr._expr).items()
}
logger.info("Initializing stateful UDFs: %s", ", ".join(partial_stateful_udfs.keys()))
self.initialized_stateful_udfs = {
name: partial_udf.func_cls() for name, partial_udf in partial_stateful_udfs.items()
}

self.initialized_stateful_udfs = {}
for name, (partial_udf, init_args) in partial_stateful_udfs.items():
if init_args is None:
self.initialized_stateful_udfs[name] = partial_udf.func_cls()
else:
args, kwargs = init_args
self.initialized_stateful_udfs[name] = partial_udf.func_cls(*args, **kwargs)

@ray.method(num_returns=2)
def run(
Expand Down Expand Up @@ -981,8 +986,12 @@ def __init__(
self._projection = projection

def setup(self) -> None:
ray_options = _get_ray_task_options(self._resource_request_per_actor)

self._actors = [
DaftRayActor.options(name=f"rank={rank}-{self._id}").remote(self._execution_config, self._projection) # type: ignore
DaftRayActor.options(name=f"rank={rank}-{self._id}", **ray_options).remote( # type: ignore
self._execution_config, self._projection
)
for rank in range(self._num_actors)
]

Expand Down Expand Up @@ -1150,8 +1159,16 @@ def run_iter_tables(

@contextlib.contextmanager
def actor_pool_context(
self, name: str, resource_request: ResourceRequest, num_actors: PartID, projection: ExpressionsProjection
self,
name: str,
actor_resource_request: ResourceRequest,
task_resource_request: ResourceRequest,
num_actors: PartID,
projection: ExpressionsProjection,
) -> Iterator[str]:
# Ray runs actor methods serially, so the resource request for an actor should be both the actor's resources and the task's resources
resource_request = actor_resource_request + task_resource_request

execution_config = get_context().daft_execution_config
if self.ray_client_mode:
try:
Expand Down
3 changes: 2 additions & 1 deletion daft/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def run_iter_tables(
def actor_pool_context(
self,
name: str,
resource_request: ResourceRequest,
actor_resource_request: ResourceRequest,
task_resource_request: ResourceRequest,
num_actors: int,
projection: ExpressionsProjection,
) -> Iterator[str]:
Expand Down
9 changes: 9 additions & 0 deletions daft/series.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import warnings
from typing import Any, Literal, TypeVar

from daft.arrow_utils import ensure_array, ensure_chunked_array
Expand Down Expand Up @@ -927,6 +928,14 @@ def iceberg_truncate(self, w: int) -> Series:

class SeriesListNamespace(SeriesNamespace):
def lengths(self) -> Series:
warnings.warn(
"This function will be deprecated from Daft version >= 0.3.5! Instead, please use 'length'",
category=DeprecationWarning,
)

return Series._from_pyseries(self._series.list_count(CountMode.All))

def length(self) -> Series:
return Series._from_pyseries(self._series.list_count(CountMode.All))

def get(self, idx: Series, default: Series) -> Series:
Expand Down
30 changes: 30 additions & 0 deletions daft/sql/_sql_funcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""This module is used for Sphinx documentation only. We procedurally generate Python functions to allow
Sphinx to generate documentation pages for every SQL function.
"""

from __future__ import annotations

from inspect import Parameter as _Parameter
from inspect import Signature as _Signature

from daft.daft import list_sql_functions as _list_sql_functions


def _create_sql_function(func_name: str, docstring: str, arg_names: list[str]):
def sql_function(*args, **kwargs):
raise NotImplementedError("This function is for documentation purposes only and should not be called.")

sql_function.__name__ = func_name
sql_function.__qualname__ = func_name
sql_function.__doc__ = docstring
sql_function.__signature__ = _Signature([_Parameter(name, _Parameter.POSITIONAL_OR_KEYWORD) for name in arg_names]) # type: ignore[attr-defined]

# Register the function in the current module
globals()[func_name] = sql_function


__all__ = []

for sql_function_stub in _list_sql_functions():
_create_sql_function(sql_function_stub.name, sql_function_stub.docstring, sql_function_stub.arg_names)
__all__.append(sql_function_stub.name)
Loading

0 comments on commit 21154e7

Please sign in to comment.