Skip to content

Commit

Permalink
Merge branch 'main' into colin/reduce-shuffle-partitions-after-1st-agg
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-ho authored Feb 13, 2024
2 parents 5e5a69c + adac24c commit 2cb12d2
Show file tree
Hide file tree
Showing 39 changed files with 475 additions and 111 deletions.
21 changes: 14 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@

`Website <https://www.getdaft.io>`_ • `Docs <https://www.getdaft.io/projects/docs/>`_ • `Installation`_ • `10-minute tour of Daft <https://www.getdaft.io/projects/docs/en/latest/learn/10-min.html>`_ • `Community and Support <https://github.com/Eventual-Inc/Daft/discussions>`_

Daft: the distributed Python dataframe for complex data
Daft: Distributed dataframes for multimodal data
=======================================================


`Daft <https://www.getdaft.io>`_ is a fast, Pythonic and scalable open-source dataframe library built for Python and Machine Learning workloads.
`Daft <https://www.getdaft.io>`_ is a distributed query engine for large-scale data processing in Python and is implemented in Rust.

* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration
* **Focus on the what:** Powerful Query Optimizer that rewrites queries to be as efficient as possible
* **Data Catalog integrations:** Full integration with data catalogs such as Apache Iceberg
* **Rich multimodal type-system:** Supports multimodal types such as Images, URLs, Tensors and more
* **Seamless Interchange**: Built on the `Apache Arrow <https://arrow.apache.org/docs/index.html>`_ In-Memory Format
* **Built for the cloud:** `Record-setting <https://blog.getdaft.io/p/announcing-daft-02-10x-faster-io>`_ I/O performance for integrations with S3 cloud storage

**Table of Contents**

Expand All @@ -21,11 +28,11 @@ Daft: the distributed Python dataframe for complex data
About Daft
----------

The Daft dataframe is a table of data with rows and columns. Columns can contain any Python objects, which allows Daft to support rich complex data types such as images, audio, video and more.
Daft was designed with the following principles in mind:

1. **Any Data**: Beyond the usual strings/numbers/dates, Daft columns can also hold complex multimodal data such as Images, Embeddings and Python objects. Ingestion and basic transformations of complex data is extremely easy and performant in Daft.
2. **Notebook Computing**: Daft is built for the interactive developer experience on a notebook - intelligent caching/query optimizations accelerates your experimentation and data exploration.
3. **Distributed Computing**: Rich complex formats such as images can quickly outgrow your local laptop's computational resources - Daft integrates natively with `Ray <https://www.ray.io>`_ for running dataframes on large clusters of machines with thousands of CPUs/GPUs.
1. **Any Data**: Beyond the usual strings/numbers/dates, Daft columns can also hold complex or nested multimodal data such as Images, Embeddings and Python objects efficiently with it's Arrow based memory representation. Ingestion and basic transformations of multimodal data is extremely easy and performant in Daft.
2. **Interactive Computing**: Daft is built for the interactive developer experience through notebooks or REPLs - intelligent caching/query optimizations accelerates your experimentation and data exploration.
3. **Distributed Computing**: Some workloads can quickly outgrow your local laptop's computational resources - Daft integrates natively with `Ray <https://www.ray.io>`_ for running dataframes on large clusters of machines with thousands of CPUs/GPUs.

Getting Started
---------------
Expand Down Expand Up @@ -101,7 +108,7 @@ Related Projects
----------------

+---------------------------------------------------+-----------------+---------------+-------------+-----------------+-----------------------------+-------------+
| Dataframe | Query Optimizer | Complex Types | Distributed | Arrow Backed | Vectorized Execution Engine | Out-of-core |
| Dataframe | Query Optimizer | Multimodal | Distributed | Arrow Backed | Vectorized Execution Engine | Out-of-core |
+===================================================+=================+===============+=============+=================+=============================+=============+
| Daft | Yes | Yes | Yes | Yes | Yes | Yes |
+---------------------------------------------------+-----------------+---------------+-------------+-----------------+-----------------------------+-------------+
Expand Down
11 changes: 8 additions & 3 deletions daft/analytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
import logging
import os
import platform
import random
import time
import urllib.error
import urllib.request
import uuid
from typing import Any, Callable

from daft import context

_ANALYTICS_CLIENT = None
_WRITE_KEY = "ebFETjqH70OOvtDvrlBC902iljBZGvPU"
_WRITE_KEY = "opL9scJXH6GKdIYgPdA0ncCj8i920LJq"
_SEGMENT_BATCH_ENDPOINT = "https://api.segment.io/v1/batch"


Expand All @@ -33,6 +33,11 @@ class AnalyticsEvent:
data: dict[str, Any]


def _get_session_key():
# Restrict the cardinality of keys to 8000
return f"anon-{random.randint(1, 8000)}"


def _build_segment_batch_payload(
events: list[AnalyticsEvent], daft_version: str, daft_build_type: str
) -> dict[str, Any]:
Expand Down Expand Up @@ -86,7 +91,7 @@ def __init__(
) -> None:
self._daft_version = daft_version
self._daft_build_type = daft_build_type
self._session_key = str(uuid.uuid4())
self._session_key = _get_session_key()

# Function to publish a payload to Segment
self._publish = publish_payload_function
Expand Down
155 changes: 97 additions & 58 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

logger = logging.getLogger(__name__)

import threading


class _RunnerConfig:
name = ClassVar[str]
Expand Down Expand Up @@ -59,28 +61,60 @@ class DaftContext:

# When a dataframe is executed, this config is copied into the Runner
# which then keeps track of a per-unique-execution-ID copy of the config, using it consistently throughout the execution
daft_execution_config: PyDaftExecutionConfig = PyDaftExecutionConfig()
_daft_execution_config: PyDaftExecutionConfig = PyDaftExecutionConfig()

# Non-execution calls (e.g. creation of a dataframe, logical plan building etc) directly reference values in this config
daft_planning_config: PyDaftPlanningConfig = PyDaftPlanningConfig()
_daft_planning_config: PyDaftPlanningConfig = PyDaftPlanningConfig()

runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env)
disallow_set_runner: bool = False
_runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env)
_disallow_set_runner: bool = False
_runner: Runner | None = None

_instance: ClassVar[DaftContext | None] = None
_lock: ClassVar[threading.Lock] = threading.Lock()

def __new__(cls):
if cls._instance is None:
with cls._lock:
# Another thread could have created the instance
# before we acquired the lock. So check that the
# instance is still nonexistent.
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance

def runner(self) -> Runner:
with self._lock:
return self._get_runner()

@property
def daft_execution_config(self) -> PyDaftExecutionConfig:
with self._lock:
return self._daft_execution_config

@property
def daft_planning_config(self) -> PyDaftPlanningConfig:
with self._lock:
return self._daft_planning_config

@property
def runner_config(self) -> _RunnerConfig:
with self._lock:
return self._runner_config

def _get_runner(self) -> Runner:
if self._runner is not None:
return self._runner

if self.runner_config.name == "ray":
if self._runner_config.name == "ray":
from daft.runners.ray_runner import RayRunner

assert isinstance(self.runner_config, _RayRunnerConfig)
assert isinstance(self._runner_config, _RayRunnerConfig)
self._runner = RayRunner(
address=self.runner_config.address,
max_task_backlog=self.runner_config.max_task_backlog,
address=self._runner_config.address,
max_task_backlog=self._runner_config.max_task_backlog,
)
elif self.runner_config.name == "py":
elif self._runner_config.name == "py":
from daft.runners.pyrunner import PyRunner

try:
Expand All @@ -96,21 +130,22 @@ def runner(self) -> Runner:
except ImportError:
pass

assert isinstance(self.runner_config, _PyRunnerConfig)
self._runner = PyRunner(use_thread_pool=self.runner_config.use_thread_pool)
assert isinstance(self._runner_config, _PyRunnerConfig)
self._runner = PyRunner(use_thread_pool=self._runner_config.use_thread_pool)

else:
raise NotImplementedError(f"Runner config implemented: {self.runner_config.name}")
raise NotImplementedError(f"Runner config implemented: {self._runner_config.name}")

# Mark DaftContext as having the runner set, which prevents any subsequent setting of the config
# after the runner has been initialized once
self.disallow_set_runner = True
self._disallow_set_runner = True

return self._runner

@property
def is_ray_runner(self) -> bool:
return isinstance(self.runner_config, _RayRunnerConfig)
with self._lock:
return isinstance(self._runner_config, _RayRunnerConfig)


_DaftContext = DaftContext()
Expand Down Expand Up @@ -144,20 +179,21 @@ def set_runner_ray(
DaftContext: Daft context after setting the Ray runner
"""
ctx = get_context()
if ctx.disallow_set_runner:
if noop_if_initialized:
warnings.warn(
"Calling daft.context.set_runner_ray(noop_if_initialized=True) multiple times has no effect beyond the first call."
)
return ctx
raise RuntimeError("Cannot set runner more than once")

ctx.runner_config = _RayRunnerConfig(
address=address,
max_task_backlog=max_task_backlog,
)
ctx.disallow_set_runner = True
return ctx
with ctx._lock:
if ctx._disallow_set_runner:
if noop_if_initialized:
warnings.warn(
"Calling daft.context.set_runner_ray(noop_if_initialized=True) multiple times has no effect beyond the first call."
)
return ctx
raise RuntimeError("Cannot set runner more than once")

ctx._runner_config = _RayRunnerConfig(
address=address,
max_task_backlog=max_task_backlog,
)
ctx._disallow_set_runner = True
return ctx


def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:
Expand All @@ -169,12 +205,13 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:
DaftContext: Daft context after setting the Py runner
"""
ctx = get_context()
if ctx.disallow_set_runner:
raise RuntimeError("Cannot set runner more than once")
with ctx._lock:
if ctx._disallow_set_runner:
raise RuntimeError("Cannot set runner more than once")

ctx.runner_config = _PyRunnerConfig(use_thread_pool=use_thread_pool)
ctx.disallow_set_runner = True
return ctx
ctx._runner_config = _PyRunnerConfig(use_thread_pool=use_thread_pool)
ctx._disallow_set_runner = True
return ctx


def set_planning_config(
Expand All @@ -192,13 +229,14 @@ def set_planning_config(
"""
# Replace values in the DaftPlanningConfig with user-specified overrides
ctx = get_context()
old_daft_planning_config = ctx.daft_planning_config if config is None else config
new_daft_planning_config = old_daft_planning_config.with_config_values(
default_io_config=default_io_config,
)
with ctx._lock:
old_daft_planning_config = ctx._daft_planning_config if config is None else config
new_daft_planning_config = old_daft_planning_config.with_config_values(
default_io_config=default_io_config,
)

ctx.daft_planning_config = new_daft_planning_config
return ctx
ctx._daft_planning_config = new_daft_planning_config
return ctx


def set_execution_config(
Expand Down Expand Up @@ -248,22 +286,23 @@ def set_execution_config(
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
ctx = get_context()
old_daft_execution_config = ctx.daft_execution_config if config is None else config
new_daft_execution_config = old_daft_execution_config.with_config_values(
scan_tasks_min_size_bytes=scan_tasks_min_size_bytes,
scan_tasks_max_size_bytes=scan_tasks_max_size_bytes,
broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold,
parquet_split_row_groups_max_files=parquet_split_row_groups_max_files,
sort_merge_join_sort_with_aligned_boundaries=sort_merge_join_sort_with_aligned_boundaries,
sample_size_for_sort=sample_size_for_sort,
num_preview_rows=num_preview_rows,
parquet_target_filesize=parquet_target_filesize,
parquet_target_row_group_size=parquet_target_row_group_size,
parquet_inflation_factor=parquet_inflation_factor,
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
)

ctx.daft_execution_config = new_daft_execution_config
return ctx
with ctx._lock:
old_daft_execution_config = ctx._daft_execution_config if config is None else config
new_daft_execution_config = old_daft_execution_config.with_config_values(
scan_tasks_min_size_bytes=scan_tasks_min_size_bytes,
scan_tasks_max_size_bytes=scan_tasks_max_size_bytes,
broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold,
parquet_split_row_groups_max_files=parquet_split_row_groups_max_files,
sort_merge_join_sort_with_aligned_boundaries=sort_merge_join_sort_with_aligned_boundaries,
sample_size_for_sort=sample_size_for_sort,
num_preview_rows=num_preview_rows,
parquet_target_filesize=parquet_target_filesize,
parquet_target_row_group_size=parquet_target_row_group_size,
parquet_inflation_factor=parquet_inflation_factor,
csv_target_filesize=csv_target_filesize,
csv_inflation_factor=csv_inflation_factor,
shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions,
)

ctx._daft_execution_config = new_daft_execution_config
return ctx
6 changes: 6 additions & 0 deletions daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ class S3Config:
anonymous: bool
verify_ssl: bool
check_hostname_ssl: bool
requester_pays: bool | None

def __init__(
self,
Expand All @@ -439,6 +440,7 @@ class S3Config:
anonymous: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
): ...
def replace(
self,
Expand All @@ -456,6 +458,7 @@ class S3Config:
anonymous: bool | None = None,
verify_ssl: bool | None = None,
check_hostname_ssl: bool | None = None,
requester_pays: bool | None = None,
) -> S3Config:
"""Replaces values if provided, returning a new S3Config"""
...
Expand Down Expand Up @@ -825,13 +828,15 @@ class PySchema:
def __reduce__(self) -> tuple: ...
def __repr__(self) -> str: ...
def _repr_html_(self) -> str: ...
def _truncated_table_string(self) -> str: ...

class PyExpr:
def _input_mapping(self) -> str | None: ...
def _required_columns(self) -> set[str]: ...
def _is_column(self) -> bool: ...
def alias(self, name: str) -> PyExpr: ...
def cast(self, dtype: PyDataType) -> PyExpr: ...
def ceil(self) -> PyExpr: ...
def if_else(self, if_true: PyExpr, if_false: PyExpr) -> PyExpr: ...
def count(self, mode: CountMode) -> PyExpr: ...
def sum(self) -> PyExpr: ...
Expand Down Expand Up @@ -940,6 +945,7 @@ class PySeries:
def _max(self) -> PySeries: ...
def _agg_list(self) -> PySeries: ...
def cast(self, dtype: PyDataType) -> PySeries: ...
def ceil(self) -> PySeries: ...
@staticmethod
def concat(series: list[PySeries]) -> PySeries: ...
def __len__(self) -> int: ...
Expand Down
5 changes: 5 additions & 0 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,11 @@ def cast(self, dtype: DataType) -> Expression:
expr = self._expr.cast(dtype._dtype)
return Expression._from_pyexpr(expr)

def ceil(self) -> Expression:
"""The ceiling of a numeric expression (``expr.ceil()``)"""
expr = self._expr.ceil()
return Expression._from_pyexpr(expr)

def _count(self, mode: CountMode = CountMode.Valid) -> Expression:
expr = self._expr.count(mode)
return Expression._from_pyexpr(expr)
Expand Down
3 changes: 3 additions & 0 deletions daft/logical/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ def __repr__(self) -> str:
def _repr_html_(self) -> str:
return self._schema._repr_html_()

def _truncated_table_string(self) -> str:
return self._schema._truncated_table_string()

def union(self, other: Schema) -> Schema:
if not isinstance(other, Schema):
raise ValueError(f"Expected Schema, got other: {type(other)}")
Expand Down
Loading

0 comments on commit 2cb12d2

Please sign in to comment.