Skip to content

Commit

Permalink
Merge branch main into colin/compute-pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 9, 2024
2 parents 84810f2 + 73ff3f3 commit d8cddf1
Show file tree
Hide file tree
Showing 387 changed files with 8,442 additions and 3,598 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[env]
PYO3_PYTHON = "./.venv/bin/python"
1 change: 1 addition & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
d5e444d0a71409ae3701d4249ad877f1fb9e2235 # introduced `rustfmt.toml` and ran formatter; ignoring large formatting changes
45e2944e252ccdd563dc20edd9b29762e05cec1d # auto-fix prefer `Self` over explicit type
5 changes: 5 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ jobs:
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -583,6 +587,7 @@ jobs:
pytest tests/integration/sql -m 'integration or not integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,76 @@ features = ["derive", "rc"]
version = "1.0.200"

[workspace.lints.clippy]
as_conversions = "allow"
cast-sign-loss = "allow"
cast_lossless = "allow"
cast_possible_truncation = "allow"
cast_possible_wrap = "allow"
cast_precision_loss = "allow"
cognitive_complexity = "allow"
default_trait_access = "allow"
doc-markdown = "allow"
doc_link_with_quotes = "allow"
enum_glob_use = "allow"
float_cmp = "allow"
fn_params_excessive_bools = "allow"
from_iter_instead_of_collect = "allow"
future_not_send = "allow"
if_not_else = "allow"
implicit_hasher = "allow"
inline_always = "allow"
into_iter_without_iter = "allow"
items_after_statements = "allow"
iter_with_drain = "allow" # REMOVE
iter_without_into_iter = "allow"
manual_let_else = "allow"
many_single_char_names = "allow"
map_unwrap_or = "allow"
match_bool = "allow"
match_same_arms = "allow"
match_wildcard_for_single_variants = "allow"
missing-panics-doc = "allow"
missing_const_for_fn = "allow"
missing_errors_doc = "allow"
module_name_repetitions = "allow"
must_use_candidate = "allow"
needless_pass_by_value = "allow"
needless_return = "allow"
nonminimal_bool = "allow"
nursery = {level = "deny", priority = -1}
only_used_in_recursion = "allow"
option_if_let_else = "allow"
pedantic = {level = "deny", priority = -1}
perf = {level = "deny", priority = -1}
redundant_closure = "allow"
redundant_closure_for_method_calls = "allow"
redundant_else = "allow"
redundant_pub_crate = "allow"
return_self_not_must_use = "allow"
significant_drop_in_scrutinee = "allow" # REMOVE
significant_drop_tightening = "allow" # REMOVE
similar_names = "allow"
single_match = "allow"
single_match_else = "allow"
struct_excessive_bools = "allow"
style = {level = "deny", priority = 1}
suspicious_operation_groupings = "allow"
too_many_lines = "allow"
trivially_copy_pass_by_ref = "allow"
type_repetition_in_bounds = "allow"
uninlined_format_args = "allow"
unnecessary_wraps = "allow"
unnested_or_patterns = "allow"
unreadable_literal = "allow"
# todo: remove?
unsafe_derive_deserialize = "allow"
unused_async = "allow"
# used_underscore_items = "allow" # REMOVE
unused_self = "allow"
use-self = "deny"
used_underscore_binding = "allow" # REMOVE REMOVE
wildcard_imports = "allow"
zero_sized_map_values = "allow"

[workspace.package]
edition = "2021"
Expand Down
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

`Website <https://www.getdaft.io>`_ • `Docs <https://www.getdaft.io/projects/docs/>`_ • `Installation`_ • `10-minute tour of Daft <https://www.getdaft.io/projects/docs/en/latest/learn/10-min.html>`_ • `Community and Support <https://github.com/Eventual-Inc/Daft/discussions>`_

Daft: Distributed dataframes for multimodal data
=======================================================
Daft: Unified Engine for Data Analytics, Engineering & ML/AI
============================================================


`Daft <https://www.getdaft.io>`_ is a distributed query engine for large-scale data processing in Python and is implemented in Rust.
`Daft <https://www.getdaft.io>`_ is a distributed query engine for large-scale data processing using Python or SQL, implemented in Rust.

* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration
* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration, or SQL for analytical queries
* **Focus on the what:** Powerful Query Optimizer that rewrites queries to be as efficient as possible
* **Data Catalog integrations:** Full integration with data catalogs such as Apache Iceberg
* **Rich multimodal type-system:** Supports multimodal types such as Images, URLs, Tensors and more
Expand Down Expand Up @@ -51,7 +51,7 @@ Quickstart

In this example, we load images from an AWS S3 bucket's URLs and resize each image in the dataframe:

.. code::
.. code:: python
import daft
Expand Down
14 changes: 13 additions & 1 deletion daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,7 @@ class PyExpr:
def approx_count_distinct(self) -> PyExpr: ...
def approx_percentiles(self, percentiles: float | list[float]) -> PyExpr: ...
def mean(self) -> PyExpr: ...
def stddev(self) -> PyExpr: ...
def min(self) -> PyExpr: ...
def max(self) -> PyExpr: ...
def any_value(self, ignore_nulls: bool) -> PyExpr: ...
Expand Down Expand Up @@ -1134,6 +1135,7 @@ def lit(item: Any) -> PyExpr: ...
def date_lit(item: int) -> PyExpr: ...
def time_lit(item: int, tu: PyTimeUnit) -> PyExpr: ...
def timestamp_lit(item: int, tu: PyTimeUnit, tz: str | None) -> PyExpr: ...
def duration_lit(item: int, tu: PyTimeUnit) -> PyExpr: ...
def decimal_lit(sign: bool, digits: tuple[int, ...], exp: int) -> PyExpr: ...
def series_lit(item: PySeries) -> PyExpr: ...
def stateless_udf(
Expand Down Expand Up @@ -1201,9 +1203,17 @@ def minhash(
# -----
# SQL functions
# -----
class SQLFunctionStub:
@property
def name(self) -> str: ...
@property
def docstring(self) -> str: ...
@property
def arg_names(self) -> list[str]: ...

def sql(sql: str, catalog: PyCatalog, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def sql_expr(sql: str) -> PyExpr: ...
def list_sql_functions() -> list[str]: ...
def list_sql_functions() -> list[SQLFunctionStub]: ...
def utf8_count_matches(expr: PyExpr, patterns: PyExpr, whole_words: bool, case_sensitive: bool) -> PyExpr: ...
def to_struct(inputs: list[PyExpr]) -> PyExpr: ...

Expand Down Expand Up @@ -1275,6 +1285,7 @@ def dt_truncate(expr: PyExpr, interval: str, relative_to: PyExpr) -> PyExpr: ...
# ---
def explode(expr: PyExpr) -> PyExpr: ...
def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ...
def list_value_counts(expr: PyExpr) -> PyExpr: ...
def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ...
def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ...
def list_get(expr: PyExpr, idx: PyExpr, default: PyExpr) -> PyExpr: ...
Expand Down Expand Up @@ -1326,6 +1337,7 @@ class PySeries:
def count(self, mode: CountMode) -> PySeries: ...
def sum(self) -> PySeries: ...
def mean(self) -> PySeries: ...
def stddev(self) -> PySeries: ...
def min(self) -> PySeries: ...
def max(self) -> PySeries: ...
def agg_list(self) -> PySeries: ...
Expand Down
55 changes: 55 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,33 @@ def mean(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self._apply_agg_fn(Expression.mean, cols)

@DataframePublicAPI
def stddev(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global standard deviation on the DataFrame
Example:
>>> import daft
>>> df = daft.from_pydict({"col_a":[0,1,2]})
>>> df = df.stddev("col_a")
>>> df.show()
╭───────────────────╮
│ col_a │
│ --- │
│ Float64 │
╞═══════════════════╡
│ 0.816496580927726 │
╰───────────────────╯
<BLANKLINE>
(Showing first 1 of 1 rows)
Args:
*cols (Union[str, Expression]): columns to stddev
Returns:
DataFrame: Globally aggregated standard deviation. Should be a single row.
"""
return self._apply_agg_fn(Expression.stddev, cols)

@DataframePublicAPI
def min(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global min on the DataFrame
Expand Down Expand Up @@ -2856,6 +2883,34 @@ def mean(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self.df._apply_agg_fn(Expression.mean, cols, self.group_by)

def stddev(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs grouped standard deviation on this GroupedDataFrame.
Example:
>>> import daft
>>> df = daft.from_pydict({"keys": ["a", "a", "a", "b"], "col_a": [0,1,2,100]})
>>> df = df.groupby("keys").stddev()
>>> df.show()
╭──────┬───────────────────╮
│ keys ┆ col_a │
│ --- ┆ --- │
│ Utf8 ┆ Float64 │
╞══════╪═══════════════════╡
│ a ┆ 0.816496580927726 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ b ┆ 0 │
╰──────┴───────────────────╯
<BLANKLINE>
(Showing first 2 of 2 rows)
Args:
*cols (Union[str, Expression]): columns to stddev
Returns:
DataFrame: DataFrame with grouped standard deviation.
"""
return self.df._apply_agg_fn(Expression.stddev, cols, self.group_by)

def min(self, *cols: ColumnInputType) -> "DataFrame":
"""Perform grouped min on this GroupedDataFrame.
Expand Down
8 changes: 7 additions & 1 deletion daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import datetime

logger = logging.getLogger(__name__)


class DeltaLakeScanOperator(ScanOperator):
def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
def __init__(
self, table_uri: str, storage_config: StorageConfig, version: int | str | datetime | None = None
) -> None:
super().__init__()

# Unfortunately delta-rs doesn't do very good inference of credentials for S3. Thus the current Daft behavior of passing
Expand Down Expand Up @@ -67,6 +70,9 @@ def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
table_uri, storage_options=io_config_to_storage_options(deltalake_sdk_io_config, table_uri)
)

if version is not None:
self._table.load_as_version(version)

self._storage_config = storage_config
self._schema = Schema.from_pyarrow_schema(self._table.schema().to_pyarrow())
partition_columns = set(self._table.metadata().partition_columns)
Expand Down
1 change: 1 addition & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def actor_pool_project(
with get_context().runner().actor_pool_context(
actor_pool_name,
actor_resource_request,
task_resource_request,
num_actors,
projection,
) as actor_pool_id:
Expand Down
Loading

0 comments on commit d8cddf1

Please sign in to comment.