Skip to content

Commit

Permalink
Merge branch main into colin/swordfish-outer-join
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Ho authored and Colin Ho committed Oct 9, 2024
2 parents d609336 + 73ff3f3 commit 09c785c
Show file tree
Hide file tree
Showing 420 changed files with 10,228 additions and 4,695 deletions.
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[env]
PYO3_PYTHON = "./.venv/bin/python"
1 change: 1 addition & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
d5e444d0a71409ae3701d4249ad877f1fb9e2235 # introduced `rustfmt.toml` and ran formatter; ignoring large formatting changes
45e2944e252ccdd563dc20edd9b29762e05cec1d # auto-fix prefer `Self` over explicit type
2 changes: 1 addition & 1 deletion .github/workflows/property-based-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ env:
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 15
timeout-minutes: 30
strategy:
fail-fast: false
matrix:
Expand Down
7 changes: 6 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,10 @@ jobs:
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -583,6 +587,7 @@ jobs:
pytest tests/integration/sql -m 'integration or not integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -754,7 +759,7 @@ jobs:
channel: stable

- name: Install Machete
run: cargo +stable install cargo-machete
run: cargo +stable install cargo-machete@0.7.0 --locked
- name: Run Machete
run: cargo machete --with-metadata

Expand Down
7 changes: 6 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@
"CARGO_TARGET_DIR": "target/analyzer"
},
"rust-analyzer.check.features": "all",
"rust-analyzer.cargo.features": "all"
"rust-analyzer.cargo.features": "all",
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 73 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ parquet2 = {path = "src/parquet2"}
debug = true

[profile.dev]
debug = "line-tables-only"
overflow-checks = false

[profile.dev.build-override]
opt-level = 3

[profile.dev-bench]
codegen-units = 16
debug = 1 # include symbols
Expand Down Expand Up @@ -225,7 +229,76 @@ features = ["derive", "rc"]
version = "1.0.200"

[workspace.lints.clippy]
as_conversions = "allow"
cast-sign-loss = "allow"
cast_lossless = "allow"
cast_possible_truncation = "allow"
cast_possible_wrap = "allow"
cast_precision_loss = "allow"
cognitive_complexity = "allow"
default_trait_access = "allow"
doc-markdown = "allow"
doc_link_with_quotes = "allow"
enum_glob_use = "allow"
float_cmp = "allow"
fn_params_excessive_bools = "allow"
from_iter_instead_of_collect = "allow"
future_not_send = "allow"
if_not_else = "allow"
implicit_hasher = "allow"
inline_always = "allow"
into_iter_without_iter = "allow"
items_after_statements = "allow"
iter_with_drain = "allow" # REMOVE
iter_without_into_iter = "allow"
manual_let_else = "allow"
many_single_char_names = "allow"
map_unwrap_or = "allow"
match_bool = "allow"
match_same_arms = "allow"
match_wildcard_for_single_variants = "allow"
missing-panics-doc = "allow"
missing_const_for_fn = "allow"
missing_errors_doc = "allow"
module_name_repetitions = "allow"
must_use_candidate = "allow"
needless_pass_by_value = "allow"
needless_return = "allow"
nonminimal_bool = "allow"
nursery = {level = "deny", priority = -1}
only_used_in_recursion = "allow"
option_if_let_else = "allow"
pedantic = {level = "deny", priority = -1}
perf = {level = "deny", priority = -1}
redundant_closure = "allow"
redundant_closure_for_method_calls = "allow"
redundant_else = "allow"
redundant_pub_crate = "allow"
return_self_not_must_use = "allow"
significant_drop_in_scrutinee = "allow" # REMOVE
significant_drop_tightening = "allow" # REMOVE
similar_names = "allow"
single_match = "allow"
single_match_else = "allow"
struct_excessive_bools = "allow"
style = {level = "deny", priority = 1}
suspicious_operation_groupings = "allow"
too_many_lines = "allow"
trivially_copy_pass_by_ref = "allow"
type_repetition_in_bounds = "allow"
uninlined_format_args = "allow"
unnecessary_wraps = "allow"
unnested_or_patterns = "allow"
unreadable_literal = "allow"
# todo: remove?
unsafe_derive_deserialize = "allow"
unused_async = "allow"
# used_underscore_items = "allow" # REMOVE
unused_self = "allow"
use-self = "deny"
used_underscore_binding = "allow" # REMOVE REMOVE
wildcard_imports = "allow"
zero_sized_map_values = "allow"

[workspace.package]
edition = "2021"
Expand Down
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

`Website <https://www.getdaft.io>`_ • `Docs <https://www.getdaft.io/projects/docs/>`_ • `Installation`_ • `10-minute tour of Daft <https://www.getdaft.io/projects/docs/en/latest/learn/10-min.html>`_ • `Community and Support <https://github.com/Eventual-Inc/Daft/discussions>`_

Daft: Distributed dataframes for multimodal data
=======================================================
Daft: Unified Engine for Data Analytics, Engineering & ML/AI
============================================================


`Daft <https://www.getdaft.io>`_ is a distributed query engine for large-scale data processing in Python and is implemented in Rust.
`Daft <https://www.getdaft.io>`_ is a distributed query engine for large-scale data processing using Python or SQL, implemented in Rust.

* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration
* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration, or SQL for analytical queries
* **Focus on the what:** Powerful Query Optimizer that rewrites queries to be as efficient as possible
* **Data Catalog integrations:** Full integration with data catalogs such as Apache Iceberg
* **Rich multimodal type-system:** Supports multimodal types such as Images, URLs, Tensors and more
Expand Down Expand Up @@ -51,7 +51,7 @@ Quickstart

In this example, we load images from an AWS S3 bucket's URLs and resize each image in the dataframe:

.. code::
.. code:: python
import daft
Expand Down
25 changes: 22 additions & 3 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ from daft.io.scan import ScanOperator
from daft.plan_scheduler.physical_plan_scheduler import PartitionT
from daft.runners.partitioning import PartitionCacheEntry
from daft.sql.sql_connection import SQLConnection
from daft.udf import PartialStatefulUDF, PartialStatelessUDF
from daft.udf import InitArgsType, PartialStatefulUDF, PartialStatelessUDF

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -1051,6 +1051,7 @@ class PyExpr:
def approx_count_distinct(self) -> PyExpr: ...
def approx_percentiles(self, percentiles: float | list[float]) -> PyExpr: ...
def mean(self) -> PyExpr: ...
def stddev(self) -> PyExpr: ...
def min(self) -> PyExpr: ...
def max(self) -> PyExpr: ...
def any_value(self, ignore_nulls: bool) -> PyExpr: ...
Expand Down Expand Up @@ -1134,6 +1135,7 @@ def lit(item: Any) -> PyExpr: ...
def date_lit(item: int) -> PyExpr: ...
def time_lit(item: int, tu: PyTimeUnit) -> PyExpr: ...
def timestamp_lit(item: int, tu: PyTimeUnit, tz: str | None) -> PyExpr: ...
def duration_lit(item: int, tu: PyTimeUnit) -> PyExpr: ...
def decimal_lit(sign: bool, digits: tuple[int, ...], exp: int) -> PyExpr: ...
def series_lit(item: PySeries) -> PyExpr: ...
def stateless_udf(
Expand All @@ -1150,12 +1152,14 @@ def stateful_udf(
expressions: list[PyExpr],
return_dtype: PyDataType,
resource_request: ResourceRequest | None,
init_args: tuple[tuple[Any, ...], dict[str, Any]] | None,
init_args: InitArgsType,
batch_size: int | None,
concurrency: int | None,
) -> PyExpr: ...
def check_column_name_validity(name: str, schema: PySchema): ...
def extract_partial_stateful_udf_py(expression: PyExpr) -> dict[str, PartialStatefulUDF]: ...
def extract_partial_stateful_udf_py(
expression: PyExpr,
) -> dict[str, tuple[PartialStatefulUDF, InitArgsType]]: ...
def bind_stateful_udfs(expression: PyExpr, initialized_funcs: dict[str, Callable]) -> PyExpr: ...
def resolve_expr(expr: PyExpr, schema: PySchema) -> tuple[PyExpr, PyField]: ...
def hash(expr: PyExpr, seed: Any | None = None) -> PyExpr: ...
Expand Down Expand Up @@ -1195,8 +1199,21 @@ def minhash(
ngram_size: int,
seed: int = 1,
) -> PyExpr: ...

# -----
# SQL functions
# -----
class SQLFunctionStub:
@property
def name(self) -> str: ...
@property
def docstring(self) -> str: ...
@property
def arg_names(self) -> list[str]: ...

def sql(sql: str, catalog: PyCatalog, daft_planning_config: PyDaftPlanningConfig) -> LogicalPlanBuilder: ...
def sql_expr(sql: str) -> PyExpr: ...
def list_sql_functions() -> list[SQLFunctionStub]: ...
def utf8_count_matches(expr: PyExpr, patterns: PyExpr, whole_words: bool, case_sensitive: bool) -> PyExpr: ...
def to_struct(inputs: list[PyExpr]) -> PyExpr: ...

Expand Down Expand Up @@ -1268,6 +1285,7 @@ def dt_truncate(expr: PyExpr, interval: str, relative_to: PyExpr) -> PyExpr: ...
# ---
def explode(expr: PyExpr) -> PyExpr: ...
def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ...
def list_value_counts(expr: PyExpr) -> PyExpr: ...
def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ...
def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ...
def list_get(expr: PyExpr, idx: PyExpr, default: PyExpr) -> PyExpr: ...
Expand Down Expand Up @@ -1319,6 +1337,7 @@ class PySeries:
def count(self, mode: CountMode) -> PySeries: ...
def sum(self) -> PySeries: ...
def mean(self) -> PySeries: ...
def stddev(self) -> PySeries: ...
def min(self) -> PySeries: ...
def max(self) -> PySeries: ...
def agg_list(self) -> PySeries: ...
Expand Down
55 changes: 55 additions & 0 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2118,6 +2118,33 @@ def mean(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self._apply_agg_fn(Expression.mean, cols)

@DataframePublicAPI
def stddev(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global standard deviation on the DataFrame
Example:
>>> import daft
>>> df = daft.from_pydict({"col_a":[0,1,2]})
>>> df = df.stddev("col_a")
>>> df.show()
╭───────────────────╮
│ col_a │
│ --- │
│ Float64 │
╞═══════════════════╡
│ 0.816496580927726 │
╰───────────────────╯
<BLANKLINE>
(Showing first 1 of 1 rows)
Args:
*cols (Union[str, Expression]): columns to stddev
Returns:
DataFrame: Globally aggregated standard deviation. Should be a single row.
"""
return self._apply_agg_fn(Expression.stddev, cols)

@DataframePublicAPI
def min(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs a global min on the DataFrame
Expand Down Expand Up @@ -2856,6 +2883,34 @@ def mean(self, *cols: ColumnInputType) -> "DataFrame":
"""
return self.df._apply_agg_fn(Expression.mean, cols, self.group_by)

def stddev(self, *cols: ColumnInputType) -> "DataFrame":
"""Performs grouped standard deviation on this GroupedDataFrame.
Example:
>>> import daft
>>> df = daft.from_pydict({"keys": ["a", "a", "a", "b"], "col_a": [0,1,2,100]})
>>> df = df.groupby("keys").stddev()
>>> df.show()
╭──────┬───────────────────╮
│ keys ┆ col_a │
│ --- ┆ --- │
│ Utf8 ┆ Float64 │
╞══════╪═══════════════════╡
│ a ┆ 0.816496580927726 │
├╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ b ┆ 0 │
╰──────┴───────────────────╯
<BLANKLINE>
(Showing first 2 of 2 rows)
Args:
*cols (Union[str, Expression]): columns to stddev
Returns:
DataFrame: DataFrame with grouped standard deviation.
"""
return self.df._apply_agg_fn(Expression.stddev, cols, self.group_by)

def min(self, *cols: ColumnInputType) -> "DataFrame":
"""Perform grouped min on this GroupedDataFrame.
Expand Down
8 changes: 7 additions & 1 deletion daft/delta_lake/delta_lake_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import datetime

logger = logging.getLogger(__name__)


class DeltaLakeScanOperator(ScanOperator):
def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
def __init__(
self, table_uri: str, storage_config: StorageConfig, version: int | str | datetime | None = None
) -> None:
super().__init__()

# Unfortunately delta-rs doesn't do very good inference of credentials for S3. Thus the current Daft behavior of passing
Expand Down Expand Up @@ -67,6 +70,9 @@ def __init__(self, table_uri: str, storage_config: StorageConfig) -> None:
table_uri, storage_options=io_config_to_storage_options(deltalake_sdk_io_config, table_uri)
)

if version is not None:
self._table.load_as_version(version)

self._storage_config = storage_config
self._schema = Schema.from_pyarrow_schema(self._table.schema().to_pyarrow())
partition_columns = set(self._table.metadata().partition_columns)
Expand Down
1 change: 1 addition & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def actor_pool_project(
with get_context().runner().actor_pool_context(
actor_pool_name,
actor_resource_request,
task_resource_request,
num_actors,
projection,
) as actor_pool_id:
Expand Down
Loading

0 comments on commit 09c785c

Please sign in to comment.