From 573ea1229fd648c192be1b365bc12a36e98a66b0 Mon Sep 17 00:00:00 2001 From: Colin Date: Fri, 9 Feb 2024 17:56:56 +0800 Subject: [PATCH 1/7] [BUG] Schema hints not working properly for json reads (#1845) Schema hints were not being propagated, leading to fields being dropped. This stemmed from an issue when reading large jsons from s3 where the fields changed only late into the file, so schema inference doesn't pick it up. --- src/daft-micropartition/src/micropartition.rs | 2 +- tests/dataframe/test_creation.py | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index 647a9b2117..8fceda4810 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -189,7 +189,7 @@ fn materialize_scan_task( column_names .as_ref() .map(|cols| cols.iter().map(|col| col.to_string()).collect()), - None, + Some(scan_task.schema.clone()), scan_task.pushdowns.filters.clone(), ); let parse_options = JsonParseOptions::new_internal(); diff --git a/tests/dataframe/test_creation.py b/tests/dataframe/test_creation.py index ed93d18451..7d92478ecd 100644 --- a/tests/dataframe/test_creation.py +++ b/tests/dataframe/test_creation.py @@ -797,6 +797,36 @@ def test_create_dataframe_json_schema_hints_ignore_random_hint(valid_data: list[ assert len(pd_df) == len(valid_data) +def test_create_dataframe_json_schema_hints_two_files() -> None: + with create_temp_filename() as fname, create_temp_filename() as fname2: + with open(fname, "w") as f: + f.write(json.dumps({"foo": {"bar": "baz"}})) + f.write("\n") + f.flush() + + with open(fname2, "w") as f: + f.write(json.dumps({"foo": {"bar2": "baz2"}})) + f.write("\n") + f.flush() + + # Without schema hints, schema inference should not pick up bar2 + assert daft.read_json([fname, fname2]).schema()["foo"].dtype == DataType.struct({"bar": DataType.string()}) + + # With schema hints, bar2 should be included + df = daft.read_json( + [fname, fname2], + schema_hints={"foo": DataType.struct({"bar": DataType.string(), "bar2": DataType.string()})}, + ) + assert df.schema()["foo"].dtype == DataType.struct({"bar": DataType.string(), "bar2": DataType.string()}) + + # When dataframe is materialized, the schema hints should be enforced and bar2 should be included + df = df.select(df["foo"].struct.get("bar2")) + df = df.where(df["bar2"].not_null()).collect() + + assert len(df) == 1 + assert df.to_pydict()["bar2"][0] == "baz2" + + @pytest.mark.parametrize( "input,expected", [ From 9b6cb946471b45a2b9f98e0ed233f624db348bdf Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Fri, 9 Feb 2024 12:58:03 -0800 Subject: [PATCH 2/7] [BUG] Protect Global Context With Mutex (#1857) * Fixes race condition with Daft is being ran with multiple threads and creates multiple Runners. This leads to the issue of cached partition sets being dropped. Like we see in https://github.com/Eventual-Inc/Daft/issues/1843 closes: https://github.com/Eventual-Inc/Daft/issues/1843 --- daft/context.py | 153 ++++++++++++++++++++++------------- daft/runners/partitioning.py | 28 ++++--- daft/runners/pyrunner.py | 6 +- daft/runners/ray_runner.py | 7 +- 4 files changed, 118 insertions(+), 76 deletions(-) diff --git a/daft/context.py b/daft/context.py index 84e219e27f..d12e34fd71 100644 --- a/daft/context.py +++ b/daft/context.py @@ -13,6 +13,8 @@ logger = logging.getLogger(__name__) +import threading + class _RunnerConfig: name = ClassVar[str] @@ -59,28 +61,60 @@ class DaftContext: # When a dataframe is executed, this config is copied into the Runner # which then keeps track of a per-unique-execution-ID copy of the config, using it consistently throughout the execution - daft_execution_config: PyDaftExecutionConfig = PyDaftExecutionConfig() + _daft_execution_config: PyDaftExecutionConfig = PyDaftExecutionConfig() # Non-execution calls (e.g. creation of a dataframe, logical plan building etc) directly reference values in this config - daft_planning_config: PyDaftPlanningConfig = PyDaftPlanningConfig() + _daft_planning_config: PyDaftPlanningConfig = PyDaftPlanningConfig() - runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env) - disallow_set_runner: bool = False + _runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env) + _disallow_set_runner: bool = False _runner: Runner | None = None + _instance: ClassVar[DaftContext | None] = None + _lock: ClassVar[threading.Lock] = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + # Another thread could have created the instance + # before we acquired the lock. So check that the + # instance is still nonexistent. + if not cls._instance: + cls._instance = super().__new__(cls) + return cls._instance + def runner(self) -> Runner: + with self._lock: + return self._get_runner() + + @property + def daft_execution_config(self) -> PyDaftExecutionConfig: + with self._lock: + return self._daft_execution_config + + @property + def daft_planning_config(self) -> PyDaftPlanningConfig: + with self._lock: + return self._daft_planning_config + + @property + def runner_config(self) -> _RunnerConfig: + with self._lock: + return self._runner_config + + def _get_runner(self) -> Runner: if self._runner is not None: return self._runner - if self.runner_config.name == "ray": + if self._runner_config.name == "ray": from daft.runners.ray_runner import RayRunner - assert isinstance(self.runner_config, _RayRunnerConfig) + assert isinstance(self._runner_config, _RayRunnerConfig) self._runner = RayRunner( - address=self.runner_config.address, - max_task_backlog=self.runner_config.max_task_backlog, + address=self._runner_config.address, + max_task_backlog=self._runner_config.max_task_backlog, ) - elif self.runner_config.name == "py": + elif self._runner_config.name == "py": from daft.runners.pyrunner import PyRunner try: @@ -96,21 +130,22 @@ def runner(self) -> Runner: except ImportError: pass - assert isinstance(self.runner_config, _PyRunnerConfig) - self._runner = PyRunner(use_thread_pool=self.runner_config.use_thread_pool) + assert isinstance(self._runner_config, _PyRunnerConfig) + self._runner = PyRunner(use_thread_pool=self._runner_config.use_thread_pool) else: - raise NotImplementedError(f"Runner config implemented: {self.runner_config.name}") + raise NotImplementedError(f"Runner config implemented: {self._runner_config.name}") # Mark DaftContext as having the runner set, which prevents any subsequent setting of the config # after the runner has been initialized once - self.disallow_set_runner = True + self._disallow_set_runner = True return self._runner @property def is_ray_runner(self) -> bool: - return isinstance(self.runner_config, _RayRunnerConfig) + with self._lock: + return isinstance(self._runner_config, _RayRunnerConfig) _DaftContext = DaftContext() @@ -144,20 +179,21 @@ def set_runner_ray( DaftContext: Daft context after setting the Ray runner """ ctx = get_context() - if ctx.disallow_set_runner: - if noop_if_initialized: - warnings.warn( - "Calling daft.context.set_runner_ray(noop_if_initialized=True) multiple times has no effect beyond the first call." - ) - return ctx - raise RuntimeError("Cannot set runner more than once") - - ctx.runner_config = _RayRunnerConfig( - address=address, - max_task_backlog=max_task_backlog, - ) - ctx.disallow_set_runner = True - return ctx + with ctx._lock: + if ctx._disallow_set_runner: + if noop_if_initialized: + warnings.warn( + "Calling daft.context.set_runner_ray(noop_if_initialized=True) multiple times has no effect beyond the first call." + ) + return ctx + raise RuntimeError("Cannot set runner more than once") + + ctx._runner_config = _RayRunnerConfig( + address=address, + max_task_backlog=max_task_backlog, + ) + ctx._disallow_set_runner = True + return ctx def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: @@ -169,12 +205,13 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: DaftContext: Daft context after setting the Py runner """ ctx = get_context() - if ctx.disallow_set_runner: - raise RuntimeError("Cannot set runner more than once") + with ctx._lock: + if ctx._disallow_set_runner: + raise RuntimeError("Cannot set runner more than once") - ctx.runner_config = _PyRunnerConfig(use_thread_pool=use_thread_pool) - ctx.disallow_set_runner = True - return ctx + ctx._runner_config = _PyRunnerConfig(use_thread_pool=use_thread_pool) + ctx._disallow_set_runner = True + return ctx def set_planning_config( @@ -192,13 +229,14 @@ def set_planning_config( """ # Replace values in the DaftPlanningConfig with user-specified overrides ctx = get_context() - old_daft_planning_config = ctx.daft_planning_config if config is None else config - new_daft_planning_config = old_daft_planning_config.with_config_values( - default_io_config=default_io_config, - ) + with ctx._lock: + old_daft_planning_config = ctx._daft_planning_config if config is None else config + new_daft_planning_config = old_daft_planning_config.with_config_values( + default_io_config=default_io_config, + ) - ctx.daft_planning_config = new_daft_planning_config - return ctx + ctx._daft_planning_config = new_daft_planning_config + return ctx def set_execution_config( @@ -246,21 +284,22 @@ def set_execution_config( """ # Replace values in the DaftExecutionConfig with user-specified overrides ctx = get_context() - old_daft_execution_config = ctx.daft_execution_config if config is None else config - new_daft_execution_config = old_daft_execution_config.with_config_values( - scan_tasks_min_size_bytes=scan_tasks_min_size_bytes, - scan_tasks_max_size_bytes=scan_tasks_max_size_bytes, - broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold, - parquet_split_row_groups_max_files=parquet_split_row_groups_max_files, - sort_merge_join_sort_with_aligned_boundaries=sort_merge_join_sort_with_aligned_boundaries, - sample_size_for_sort=sample_size_for_sort, - num_preview_rows=num_preview_rows, - parquet_target_filesize=parquet_target_filesize, - parquet_target_row_group_size=parquet_target_row_group_size, - parquet_inflation_factor=parquet_inflation_factor, - csv_target_filesize=csv_target_filesize, - csv_inflation_factor=csv_inflation_factor, - ) - - ctx.daft_execution_config = new_daft_execution_config - return ctx + with ctx._lock: + old_daft_execution_config = ctx._daft_execution_config if config is None else config + new_daft_execution_config = old_daft_execution_config.with_config_values( + scan_tasks_min_size_bytes=scan_tasks_min_size_bytes, + scan_tasks_max_size_bytes=scan_tasks_max_size_bytes, + broadcast_join_size_bytes_threshold=broadcast_join_size_bytes_threshold, + parquet_split_row_groups_max_files=parquet_split_row_groups_max_files, + sort_merge_join_sort_with_aligned_boundaries=sort_merge_join_sort_with_aligned_boundaries, + sample_size_for_sort=sample_size_for_sort, + num_preview_rows=num_preview_rows, + parquet_target_filesize=parquet_target_filesize, + parquet_target_row_group_size=parquet_target_row_group_size, + parquet_inflation_factor=parquet_inflation_factor, + csv_target_filesize=csv_target_filesize, + csv_inflation_factor=csv_inflation_factor, + ) + + ctx._daft_execution_config = new_daft_execution_config + return ctx diff --git a/daft/runners/partitioning.py b/daft/runners/partitioning.py index e43d6eaee1..8836a3bc5c 100644 --- a/daft/runners/partitioning.py +++ b/daft/runners/partitioning.py @@ -1,6 +1,7 @@ from __future__ import annotations import sys +import threading import weakref from abc import abstractmethod from dataclasses import dataclass @@ -296,24 +297,33 @@ def size_bytes(self) -> int | None: class PartitionSetCache: def __init__(self) -> None: - self._uuid_to_partition_set: weakref.WeakValueDictionary[ + self.__uuid_to_partition_set: weakref.WeakValueDictionary[ str, PartitionCacheEntry ] = weakref.WeakValueDictionary() + self._lock = threading.Lock() def get_partition_set(self, pset_id: str) -> PartitionCacheEntry: - assert pset_id in self._uuid_to_partition_set - return self._uuid_to_partition_set[pset_id] + with self._lock: + assert pset_id in self.__uuid_to_partition_set + return self.__uuid_to_partition_set[pset_id] + + def get_all_partition_sets(self) -> dict[str, PartitionSet]: + with self._lock: + return {key: entry.value for key, entry in self.__uuid_to_partition_set.items() if entry.value is not None} def put_partition_set(self, pset: PartitionSet) -> PartitionCacheEntry: pset_id = uuid4().hex part_entry = PartitionCacheEntry(pset_id, pset) - self._uuid_to_partition_set[pset_id] = part_entry - return part_entry + with self._lock: + self.__uuid_to_partition_set[pset_id] = part_entry + return part_entry def rm(self, pset_id: str) -> None: - if pset_id in self._uuid_to_partition_set: - del self._uuid_to_partition_set[pset_id] + with self._lock: + if pset_id in self.__uuid_to_partition_set: + del self.__uuid_to_partition_set[pset_id] def clear(self) -> None: - del self._uuid_to_partition_set - self._uuid_to_partition_set = weakref.WeakValueDictionary() + with self._lock: + del self.__uuid_to_partition_set + self.__uuid_to_partition_set = weakref.WeakValueDictionary() diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 6b6a7e98ba..2be28e4c54 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -148,11 +148,7 @@ def run_iter( # Finalize the logical plan and get a physical plan scheduler for translating the # physical plan to executable tasks. plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config) - psets = { - key: entry.value.values() - for key, entry in self._part_set_cache._uuid_to_partition_set.items() - if entry.value is not None - } + psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()} # Get executable tasks from planner. tasks = plan_scheduler.to_partition_tasks(psets, is_ray_runner=False) with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"): diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index d765a313d5..f8803995c0 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -727,11 +727,8 @@ def run_iter( # physical plan to executable tasks. plan_scheduler = builder.to_physical_plan_scheduler(daft_execution_config) - psets = { - key: entry.value.values() - for key, entry in self._part_set_cache._uuid_to_partition_set.items() - if entry.value is not None - } + psets = {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()} + result_uuid = str(uuid.uuid4()) if isinstance(self.ray_context, ray.client_builder.ClientContext): ray.get( From 05e3e3fbbefdaa1780484e58b2c002e4f6b9ec6b Mon Sep 17 00:00:00 2001 From: Matt H Date: Sun, 11 Feb 2024 18:32:07 -0500 Subject: [PATCH 3/7] [FEAT] Add ceil function (#1867) Adding the `ceil` function to match https://ibis-project.org/reference/expression-numeric#ibis.expr.types.numeric.NumericValue.ceil. Added tests with example usage. This is my first PR here. Happy to take on more of the functions in #1037 after getting through this one. --- daft/daft.pyi | 2 + daft/expressions/expressions.py | 5 +++ daft/series.py | 3 ++ docs/source/api_docs/expressions.rst | 1 + src/daft-core/src/array/ops/ceil.rs | 18 +++++++++ src/daft-core/src/array/ops/mod.rs | 1 + src/daft-core/src/python/series.rs | 4 ++ src/daft-core/src/series/ops/ceil.rs | 20 ++++++++++ src/daft-core/src/series/ops/mod.rs | 1 + src/daft-dsl/src/functions/numeric/ceil.rs | 41 +++++++++++++++++++++ src/daft-dsl/src/functions/numeric/mod.rs | 12 ++++++ src/daft-dsl/src/python.rs | 5 +++ tests/expressions/test_expressions.py | 9 +++++ tests/expressions/typing/test_arithmetic.py | 10 +++++ tests/table/test_eval.py | 23 ++++++++++++ 15 files changed, 155 insertions(+) create mode 100644 src/daft-core/src/array/ops/ceil.rs create mode 100644 src/daft-core/src/series/ops/ceil.rs create mode 100644 src/daft-dsl/src/functions/numeric/ceil.rs diff --git a/daft/daft.pyi b/daft/daft.pyi index 6d2b51ebfe..9361906b84 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -832,6 +832,7 @@ class PyExpr: def _is_column(self) -> bool: ... def alias(self, name: str) -> PyExpr: ... def cast(self, dtype: PyDataType) -> PyExpr: ... + def ceil(self) -> PyExpr: ... def if_else(self, if_true: PyExpr, if_false: PyExpr) -> PyExpr: ... def count(self, mode: CountMode) -> PyExpr: ... def sum(self) -> PyExpr: ... @@ -940,6 +941,7 @@ class PySeries: def _max(self) -> PySeries: ... def _agg_list(self) -> PySeries: ... def cast(self, dtype: PyDataType) -> PySeries: ... + def ceil(self) -> PySeries: ... @staticmethod def concat(series: list[PySeries]) -> PySeries: ... def __len__(self) -> int: ... diff --git a/daft/expressions/expressions.py b/daft/expressions/expressions.py index 11d1277fa0..5ee05c4cd4 100644 --- a/daft/expressions/expressions.py +++ b/daft/expressions/expressions.py @@ -315,6 +315,11 @@ def cast(self, dtype: DataType) -> Expression: expr = self._expr.cast(dtype._dtype) return Expression._from_pyexpr(expr) + def ceil(self) -> Expression: + """The ceiling of a numeric expression (``expr.ceil()``)""" + expr = self._expr.ceil() + return Expression._from_pyexpr(expr) + def _count(self, mode: CountMode = CountMode.Valid) -> Expression: expr = self._expr.count(mode) return Expression._from_pyexpr(expr) diff --git a/daft/series.py b/daft/series.py index ffcaa8d74e..1f4782b4f0 100644 --- a/daft/series.py +++ b/daft/series.py @@ -349,6 +349,9 @@ def size_bytes(self) -> int: def __abs__(self) -> Series: return Series._from_pyseries(abs(self._series)) + def ceil(self) -> Series: + return Series._from_pyseries(self._series.ceil()) + def __add__(self, other: object) -> Series: if not isinstance(other, Series): raise TypeError(f"expected another Series but got {type(other)}") diff --git a/docs/source/api_docs/expressions.rst b/docs/source/api_docs/expressions.rst index e586a8c2b9..e977c849d8 100644 --- a/docs/source/api_docs/expressions.rst +++ b/docs/source/api_docs/expressions.rst @@ -46,6 +46,7 @@ Numeric Expression.__mul__ Expression.__truediv__ Expression.__mod__ + Expression.ceil .. _api-comparison-expression: diff --git a/src/daft-core/src/array/ops/ceil.rs b/src/daft-core/src/array/ops/ceil.rs new file mode 100644 index 0000000000..3d677fff65 --- /dev/null +++ b/src/daft-core/src/array/ops/ceil.rs @@ -0,0 +1,18 @@ +use num_traits::Float; + +use crate::{ + array::DataArray, + datatypes::{DaftFloatType, DaftNumericType}, +}; + +use common_error::DaftResult; + +impl DataArray +where + T: DaftNumericType, + T::Native: Float, +{ + pub fn ceil(&self) -> DaftResult { + self.apply(|v| v.ceil()) + } +} diff --git a/src/daft-core/src/array/ops/mod.rs b/src/daft-core/src/array/ops/mod.rs index ba69e727b6..d9d66cfec6 100644 --- a/src/daft-core/src/array/ops/mod.rs +++ b/src/daft-core/src/array/ops/mod.rs @@ -6,6 +6,7 @@ pub mod arrow2; pub mod as_arrow; pub(crate) mod broadcast; pub(crate) mod cast; +mod ceil; mod compare_agg; mod comparison; mod concat; diff --git a/src/daft-core/src/python/series.rs b/src/daft-core/src/python/series.rs index d21def6a8f..e89b206399 100644 --- a/src/daft-core/src/python/series.rs +++ b/src/daft-core/src/python/series.rs @@ -108,6 +108,10 @@ impl PySeries { Ok(self.series.xor(&other.series)?.into_series().into()) } + pub fn ceil(&self) -> PyResult { + Ok(self.series.ceil()?.into()) + } + pub fn take(&self, idx: &Self) -> PyResult { Ok(self.series.take(&idx.series)?.into()) } diff --git a/src/daft-core/src/series/ops/ceil.rs b/src/daft-core/src/series/ops/ceil.rs new file mode 100644 index 0000000000..3a01f045a6 --- /dev/null +++ b/src/daft-core/src/series/ops/ceil.rs @@ -0,0 +1,20 @@ +use crate::datatypes::DataType; +use crate::series::Series; +use common_error::DaftError; +use common_error::DaftResult; +impl Series { + pub fn ceil(&self) -> DaftResult { + use crate::series::array_impl::IntoSeries; + + use DataType::*; + match self.data_type() { + Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => Ok(self.clone()), + Float32 => Ok(self.f32().unwrap().ceil()?.into_series()), + Float64 => Ok(self.f64().unwrap().ceil()?.into_series()), + dt => Err(DaftError::TypeError(format!( + "ceil not implemented for {}", + dt + ))), + } + } +} diff --git a/src/daft-core/src/series/ops/mod.rs b/src/daft-core/src/series/ops/mod.rs index e328be0cd3..eb0f75b245 100644 --- a/src/daft-core/src/series/ops/mod.rs +++ b/src/daft-core/src/series/ops/mod.rs @@ -8,6 +8,7 @@ pub mod agg; pub mod arithmetic; pub mod broadcast; pub mod cast; +pub mod ceil; pub mod comparison; pub mod concat; pub mod date; diff --git a/src/daft-dsl/src/functions/numeric/ceil.rs b/src/daft-dsl/src/functions/numeric/ceil.rs new file mode 100644 index 0000000000..9eec7d09f3 --- /dev/null +++ b/src/daft-dsl/src/functions/numeric/ceil.rs @@ -0,0 +1,41 @@ +use common_error::{DaftError, DaftResult}; +use daft_core::{datatypes::Field, schema::Schema, series::Series}; + +use crate::Expr; + +use super::super::FunctionEvaluator; + +pub(super) struct CeilEvaluator {} + +impl FunctionEvaluator for CeilEvaluator { + fn fn_name(&self) -> &'static str { + "ceil" + } + + fn to_field(&self, inputs: &[Expr], schema: &Schema, _: &Expr) -> DaftResult { + if inputs.len() != 1 { + return Err(DaftError::SchemaMismatch(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))); + } + let field = inputs.first().unwrap().to_field(schema)?; + if !field.dtype.is_numeric() { + return Err(DaftError::TypeError(format!( + "Expected input to ceil to be numeric, got {}", + field.dtype + ))); + } + Ok(field) + } + + fn evaluate(&self, inputs: &[Series], _: &Expr) -> DaftResult { + if inputs.len() != 1 { + return Err(DaftError::ValueError(format!( + "Expected 1 input arg, got {}", + inputs.len() + ))); + } + inputs.first().unwrap().ceil() + } +} diff --git a/src/daft-dsl/src/functions/numeric/mod.rs b/src/daft-dsl/src/functions/numeric/mod.rs index e8706192aa..7e54d83c68 100644 --- a/src/daft-dsl/src/functions/numeric/mod.rs +++ b/src/daft-dsl/src/functions/numeric/mod.rs @@ -1,6 +1,9 @@ mod abs; +mod ceil; use abs::AbsEvaluator; +use ceil::CeilEvaluator; + use serde::{Deserialize, Serialize}; use crate::Expr; @@ -10,6 +13,7 @@ use super::FunctionEvaluator; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub enum NumericExpr { Abs, + Ceil, } impl NumericExpr { @@ -18,6 +22,7 @@ impl NumericExpr { use NumericExpr::*; match self { Abs => &AbsEvaluator {}, + Ceil => &CeilEvaluator {}, } } } @@ -28,3 +33,10 @@ pub fn abs(input: &Expr) -> Expr { inputs: vec![input.clone()], } } + +pub fn ceil(input: &Expr) -> Expr { + Expr::Function { + func: super::FunctionExpr::Numeric(NumericExpr::Ceil), + inputs: vec![input.clone()], + } +} diff --git a/src/daft-dsl/src/python.rs b/src/daft-dsl/src/python.rs index eff9bc2738..f135b96bb4 100644 --- a/src/daft-dsl/src/python.rs +++ b/src/daft-dsl/src/python.rs @@ -144,6 +144,11 @@ impl PyExpr { Ok(self.expr.cast(&dtype.into()).into()) } + pub fn ceil(&self) -> PyResult { + use functions::numeric::ceil; + Ok(ceil(&self.expr).into()) + } + pub fn if_else(&self, if_true: &Self, if_false: &Self) -> PyResult { Ok(self.expr.if_else(&if_true.expr, &if_false.expr).into()) } diff --git a/tests/expressions/test_expressions.py b/tests/expressions/test_expressions.py index 0884690963..84102ef625 100644 --- a/tests/expressions/test_expressions.py +++ b/tests/expressions/test_expressions.py @@ -86,6 +86,15 @@ def test_repr_functions_abs() -> None: assert repr_out == repr(copied) +def test_repr_functions_ceil() -> None: + a = col("a") + y = a.ceil() + repr_out = repr(y) + assert repr_out == "ceil(col(a))" + copied = copy.deepcopy(y) + assert repr_out == repr(copied) + + def test_repr_functions_day() -> None: a = col("a") y = a.dt.day() diff --git a/tests/expressions/typing/test_arithmetic.py b/tests/expressions/typing/test_arithmetic.py index 827aa85d7a..4d47a069a3 100644 --- a/tests/expressions/typing/test_arithmetic.py +++ b/tests/expressions/typing/test_arithmetic.py @@ -72,3 +72,13 @@ def test_abs(unary_data_fixture): run_kernel=lambda: abs(arg), resolvable=is_numeric(arg.datatype()), ) + + +def test_ceil(unary_data_fixture): + arg = unary_data_fixture + assert_typing_resolve_vs_runtime_behavior( + data=(unary_data_fixture,), + expr=col(arg.name()).ceil(), + run_kernel=lambda: arg.ceil(), + resolvable=is_numeric(arg.datatype()), + ) diff --git a/tests/table/test_eval.py b/tests/table/test_eval.py index 5f6d3948b8..b87b3bd6e1 100644 --- a/tests/table/test_eval.py +++ b/tests/table/test_eval.py @@ -1,6 +1,7 @@ from __future__ import annotations import itertools +import math import operator as ops import pyarrow as pa @@ -157,3 +158,25 @@ def test_table_abs_bad_input() -> None: with pytest.raises(ValueError, match="Expected input to abs to be numeric"): table.eval_expression_list([abs(col("a"))]) + + +def test_table_numeric_ceil() -> None: + table = MicroPartition.from_pydict( + {"a": [None, -1.0, -0.5, 0, 0.5, 2, None], "b": [-1.7, -1.5, -1.3, 0.3, 0.7, None, None]} + ) + + ceil_table = table.eval_expression_list([col("a").ceil(), col("b").ceil()]) + + assert [math.ceil(v) if v is not None else v for v in table.get_column("a").to_pylist()] == ceil_table.get_column( + "a" + ).to_pylist() + assert [math.ceil(v) if v is not None else v for v in table.get_column("b").to_pylist()] == ceil_table.get_column( + "b" + ).to_pylist() + + +def test_table_ceil_bad_input() -> None: + table = MicroPartition.from_pydict({"a": ["a", "b", "c"]}) + + with pytest.raises(ValueError, match="Expected input to ceil to be numeric"): + table.eval_expression_list([col("a").ceil()]) From 8565272fee5c50cb5d668e56e3bd7b238a52ceaa Mon Sep 17 00:00:00 2001 From: Colin Date: Mon, 12 Feb 2024 08:39:03 +0800 Subject: [PATCH 4/7] [FEAT] Enable Requester Pay for S3 reads (#1856) This PR enables reads from S3 buckets with a requester pays policy. This configuration is set in the S3 IO Config. Todo: integration tests? will we need to set up a requester pays bucket for this? I did test it on the 1tr row parquets tho, so it is working. --- daft/daft.pyi | 3 +++ src/common/io-config/src/python.rs | 11 +++++++++++ src/common/io-config/src/s3.rs | 9 +++++++-- src/daft-io/src/s3_like.rs | 18 ++++++++++++++++++ 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 9361906b84..7113ba5957 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -422,6 +422,7 @@ class S3Config: anonymous: bool verify_ssl: bool check_hostname_ssl: bool + requester_pays: bool | None def __init__( self, @@ -439,6 +440,7 @@ class S3Config: anonymous: bool | None = None, verify_ssl: bool | None = None, check_hostname_ssl: bool | None = None, + requester_pays: bool | None = None, ): ... def replace( self, @@ -456,6 +458,7 @@ class S3Config: anonymous: bool | None = None, verify_ssl: bool | None = None, check_hostname_ssl: bool | None = None, + requester_pays: bool | None = None, ) -> S3Config: """Replaces values if provided, returning a new S3Config""" ... diff --git a/src/common/io-config/src/python.rs b/src/common/io-config/src/python.rs index 025ffcb9e5..9889e95a08 100644 --- a/src/common/io-config/src/python.rs +++ b/src/common/io-config/src/python.rs @@ -23,6 +23,7 @@ use crate::config; /// anonymous: Whether or not to use "anonymous mode", which will access S3 without any credentials /// verify_ssl: Whether or not to verify ssl certificates, which will access S3 without checking if the certs are valid, defaults to True /// check_hostname_ssl: Whether or not to verify the hostname when verifying ssl certificates, this was the legacy behavior for openssl, defaults to True +/// requester_pays: Whether or not the authenticated user will assume transfer costs, which is required by some providers of bulk data, defaults to False /// /// Example: /// >>> io_config = IOConfig(s3=S3Config(key_id="xxx", access_key="xxx")) @@ -182,6 +183,7 @@ impl S3Config { anonymous: Option, verify_ssl: Option, check_hostname_ssl: Option, + requester_pays: Option, ) -> Self { let def = crate::S3Config::default(); S3Config { @@ -202,6 +204,7 @@ impl S3Config { anonymous: anonymous.unwrap_or(def.anonymous), verify_ssl: verify_ssl.unwrap_or(def.verify_ssl), check_hostname_ssl: check_hostname_ssl.unwrap_or(def.check_hostname_ssl), + requester_pays: requester_pays.unwrap_or(def.requester_pays), }, } } @@ -223,6 +226,7 @@ impl S3Config { anonymous: Option, verify_ssl: Option, check_hostname_ssl: Option, + requester_pays: Option, ) -> Self { S3Config { config: crate::S3Config { @@ -242,6 +246,7 @@ impl S3Config { anonymous: anonymous.unwrap_or(self.config.anonymous), verify_ssl: verify_ssl.unwrap_or(self.config.verify_ssl), check_hostname_ssl: check_hostname_ssl.unwrap_or(self.config.check_hostname_ssl), + requester_pays: requester_pays.unwrap_or(self.config.requester_pays), }, } } @@ -333,6 +338,12 @@ impl S3Config { pub fn check_hostname_ssl(&self) -> PyResult> { Ok(Some(self.config.check_hostname_ssl)) } + + /// AWS Requester Pays + #[getter] + pub fn requester_pays(&self) -> PyResult> { + Ok(Some(self.config.requester_pays)) + } } #[pymethods] diff --git a/src/common/io-config/src/s3.rs b/src/common/io-config/src/s3.rs index 0a28fda55b..c2a7c66dae 100644 --- a/src/common/io-config/src/s3.rs +++ b/src/common/io-config/src/s3.rs @@ -20,6 +20,7 @@ pub struct S3Config { pub anonymous: bool, pub verify_ssl: bool, pub check_hostname_ssl: bool, + pub requester_pays: bool, } impl S3Config { @@ -57,6 +58,7 @@ impl S3Config { res.push(format!("Anonymous = {}", self.anonymous)); res.push(format!("Verify SSL = {}", self.verify_ssl)); res.push(format!("Check hostname SSL = {}", self.check_hostname_ssl)); + res.push(format!("Requester pays = {}", self.requester_pays)); res } } @@ -80,6 +82,7 @@ impl Default for S3Config { anonymous: false, verify_ssl: true, check_hostname_ssl: true, + requester_pays: false, } } } @@ -102,7 +105,8 @@ impl Display for S3Config { retry_mode: {:?}, anonymous: {}, verify_ssl: {}, - check_hostname_ssl: {}", + check_hostname_ssl: {} + requester_pays: {}", self.region_name, self.endpoint_url, self.key_id, @@ -116,7 +120,8 @@ impl Display for S3Config { self.retry_mode, self.anonymous, self.verify_ssl, - self.check_hostname_ssl + self.check_hostname_ssl, + self.requester_pays ) } } diff --git a/src/daft-io/src/s3_like.rs b/src/daft-io/src/s3_like.rs index c7b0cbf06a..4df15407f1 100644 --- a/src/daft-io/src/s3_like.rs +++ b/src/daft-io/src/s3_like.rs @@ -448,6 +448,12 @@ impl S3LikeSource { .bucket(bucket) .key(key); + let request = if self.s3_config.requester_pays { + request.request_payer(s3::types::RequestPayer::Requester) + } else { + request + }; + let request = match &range { None => request, Some(range) => request.range(format!( @@ -552,6 +558,12 @@ impl S3LikeSource { .bucket(bucket) .key(key); + let request = if self.s3_config.requester_pays { + request.request_payer(s3::types::RequestPayer::Requester) + } else { + request + }; + let response = if self.anonymous { request .customize_middleware() @@ -643,6 +655,12 @@ impl S3LikeSource { } else { request }; + let request = if self.s3_config.requester_pays { + request.request_payer(s3::types::RequestPayer::Requester) + } else { + request + }; + let response = if self.anonymous { request .customize_middleware() From fe92fe9a98819a94ec5faf12a42a4f75db0e7203 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Mon, 12 Feb 2024 12:05:04 -0800 Subject: [PATCH 5/7] [CHORE] Update segment logging to use restricted set of IDs (#1870) * Uses a new segment workspace * Update ID to restrict it to a set of 8000 unique IDs --------- Co-authored-by: Jay Chia --- daft/analytics.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/daft/analytics.py b/daft/analytics.py index 205a2c9398..4944aa2bad 100644 --- a/daft/analytics.py +++ b/daft/analytics.py @@ -9,16 +9,16 @@ import logging import os import platform +import random import time import urllib.error import urllib.request -import uuid from typing import Any, Callable from daft import context _ANALYTICS_CLIENT = None -_WRITE_KEY = "ebFETjqH70OOvtDvrlBC902iljBZGvPU" +_WRITE_KEY = "opL9scJXH6GKdIYgPdA0ncCj8i920LJq" _SEGMENT_BATCH_ENDPOINT = "https://api.segment.io/v1/batch" @@ -33,6 +33,11 @@ class AnalyticsEvent: data: dict[str, Any] +def _get_session_key(): + # Restrict the cardinality of keys to 8000 + return f"anon-{random.randint(1, 8000)}" + + def _build_segment_batch_payload( events: list[AnalyticsEvent], daft_version: str, daft_build_type: str ) -> dict[str, Any]: @@ -86,7 +91,7 @@ def __init__( ) -> None: self._daft_version = daft_version self._daft_build_type = daft_build_type - self._session_key = str(uuid.uuid4()) + self._session_key = _get_session_key() # Function to publish a payload to Segment self._publish = publish_payload_function From 4caeed412e50526d4172d5ade4acda727d71d752 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 12 Feb 2024 12:38:26 -0800 Subject: [PATCH 6/7] [FEAT] show full schema on request (#1868) * Shows full schema when calling `df.schema()` * preserves current behavior when calling `repr(df)` closes: https://github.com/Eventual-Inc/Daft/issues/1725 image --- daft/daft.pyi | 1 + daft/logical/schema.py | 3 +++ daft/viz/dataframe_display.py | 2 +- src/daft-core/src/python/schema.rs | 4 ++++ src/daft-core/src/schema.rs | 22 +++++++++++++++---- src/daft-core/src/utils/display_table.rs | 28 ++++++++++++++++++++++++ tests/dataframe/test_repr.py | 14 ++++++++++-- tests/test_schema.py | 25 +++++++++++++++++++-- 8 files changed, 90 insertions(+), 9 deletions(-) diff --git a/daft/daft.pyi b/daft/daft.pyi index 7113ba5957..661e179284 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -828,6 +828,7 @@ class PySchema: def __reduce__(self) -> tuple: ... def __repr__(self) -> str: ... def _repr_html_(self) -> str: ... + def _truncated_table_string(self) -> str: ... class PyExpr: def _input_mapping(self) -> str | None: ... diff --git a/daft/logical/schema.py b/daft/logical/schema.py index c0a5455bd7..9a1950db14 100644 --- a/daft/logical/schema.py +++ b/daft/logical/schema.py @@ -132,6 +132,9 @@ def __repr__(self) -> str: def _repr_html_(self) -> str: return self._schema._repr_html_() + def _truncated_table_string(self) -> str: + return self._schema._truncated_table_string() + def union(self, other: Schema) -> Schema: if not isinstance(other, Schema): raise ValueError(f"Expected Schema, got other: {type(other)}") diff --git a/daft/viz/dataframe_display.py b/daft/viz/dataframe_display.py index e58830dbfc..7e66bfcc81 100644 --- a/daft/viz/dataframe_display.py +++ b/daft/viz/dataframe_display.py @@ -57,7 +57,7 @@ def __repr__(self) -> str: if self.preview.preview_partition is not None: res = repr(self.preview.preview_partition.to_table()) else: - res = repr(self.schema) + res = self.schema._truncated_table_string() res += f"\n{self._get_user_message()}" diff --git a/src/daft-core/src/python/schema.rs b/src/daft-core/src/python/schema.rs index 7408947c08..f4f50bdadf 100644 --- a/src/daft-core/src/python/schema.rs +++ b/src/daft-core/src/python/schema.rs @@ -70,6 +70,10 @@ impl PySchema { pub fn _repr_html_(&self) -> PyResult { Ok(self.schema.repr_html()) } + + pub fn _truncated_table_string(&self) -> PyResult { + Ok(self.schema.truncated_table_string()) + } } impl_bincode_py_state_serialization!(PySchema); diff --git a/src/daft-core/src/schema.rs b/src/daft-core/src/schema.rs index cbb96fe3e4..4d44c9cea1 100644 --- a/src/daft-core/src/schema.rs +++ b/src/daft-core/src/schema.rs @@ -9,7 +9,10 @@ use std::{ use indexmap::IndexMap; use serde::{Deserialize, Serialize}; -use crate::{datatypes::Field, utils::display_table::make_comfy_table}; +use crate::{ + datatypes::Field, + utils::display_table::{make_comfy_table, make_schema_vertical_table}, +}; use common_error::{DaftError, DaftResult}; @@ -143,6 +146,19 @@ impl Schema { .collect::>() .join(", ") } + + pub fn truncated_table_string(&self) -> String { + let table = make_comfy_table( + self.fields + .values() + .map(Cow::Borrowed) + .collect::>() + .as_slice(), + None, + None, + ); + format!("{}\n", table) + } } impl Eq for Schema {} @@ -183,14 +199,12 @@ impl Default for Schema { impl Display for Schema { // Produces an ASCII table. fn fmt(&self, f: &mut Formatter) -> Result { - let table = make_comfy_table( + let table = make_schema_vertical_table( self.fields .values() .map(Cow::Borrowed) .collect::>() .as_slice(), - None, - None, ); writeln!(f, "{table}") } diff --git a/src/daft-core/src/utils/display_table.rs b/src/daft-core/src/utils/display_table.rs index f16dbd1344..5bb2a31482 100644 --- a/src/daft-core/src/utils/display_table.rs +++ b/src/daft-core/src/utils/display_table.rs @@ -50,6 +50,34 @@ pub fn display_series_literal(series: &Series) -> String { } } +pub fn make_schema_vertical_table>(fields: &[F]) -> comfy_table::Table { + let mut table = comfy_table::Table::new(); + + let default_width_if_no_tty = 120usize; + + table + .load_preset(comfy_table::presets::UTF8_FULL) + .apply_modifier(comfy_table::modifiers::UTF8_ROUND_CORNERS) + .set_content_arrangement(comfy_table::ContentArrangement::Dynamic); + if table.width().is_none() && !table.is_tty() { + table.set_width(default_width_if_no_tty as u16); + } + + let header = vec![ + comfy_table::Cell::new("Column Name").add_attribute(comfy_table::Attribute::Bold), + comfy_table::Cell::new("Type").add_attribute(comfy_table::Attribute::Bold), + ]; + table.set_header(header); + + for f in fields.iter() { + table.add_row(vec![ + f.as_ref().name.to_string(), + format!("{}", f.as_ref().dtype), + ]); + } + table +} + pub fn make_comfy_table>( fields: &[F], columns: Option<&[&Series]>, diff --git a/tests/dataframe/test_repr.py b/tests/dataframe/test_repr.py index c0d8cef7dc..7b89f1a276 100644 --- a/tests/dataframe/test_repr.py +++ b/tests/dataframe/test_repr.py @@ -292,14 +292,24 @@ def test_repr_empty_struct(): data = {"empty_structs": [{}, {}], "nested_empty_structs": [{"a": {}}, {"b": {}}]} df = daft.from_pydict(data) - expected_schema_repr = """╭───────────────┬──────────────────────────────────╮ + expected_schema_truncated_repr = """╭───────────────┬──────────────────────────────────╮ │ empty_structs ┆ nested_empty_structs │ │ --- ┆ --- │ │ Struct[] ┆ Struct[a: Struct[], b: Struct[]] │ ╰───────────────┴──────────────────────────────────╯ """ + assert df.schema()._truncated_table_string() == expected_schema_truncated_repr + + expected_schema_repr = """╭──────────────────────┬──────────────────────────────────╮ +│ Column Name ┆ Type │ +╞══════════════════════╪══════════════════════════════════╡ +│ empty_structs ┆ Struct[] │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ nested_empty_structs ┆ Struct[a: Struct[], b: Struct[]] │ +╰──────────────────────┴──────────────────────────────────╯ +""" + assert repr(df.schema()) == expected_schema_repr - assert str(df.schema()) == expected_schema_repr expected_repr = """╭───────────────┬──────────────────────────────────╮ │ empty_structs ┆ nested_empty_structs │ │ --- ┆ --- │ diff --git a/tests/test_schema.py b/tests/test_schema.py index b58e95d5bf..84402f6a2b 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -63,9 +63,9 @@ def test_schema_to_name_set(): assert schema.to_name_set() == set(DATA.keys()) -def test_repr(): +def test_truncated_repr(): schema = TABLE.schema() - out_repr = repr(schema) + out_repr = schema._truncated_table_string() without_escape = ANSI_ESCAPE.sub("", out_repr) assert ( without_escape.replace("\r", "") @@ -78,6 +78,27 @@ def test_repr(): ) +def test_repr(): + schema = TABLE.schema() + out_repr = repr(schema) + without_escape = ANSI_ESCAPE.sub("", out_repr) + assert ( + without_escape.replace("\r", "") + == """╭─────────────┬─────────╮ +│ Column Name ┆ Type │ +╞═════════════╪═════════╡ +│ int ┆ Int64 │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ +│ float ┆ Float64 │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ +│ string ┆ Utf8 │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤ +│ bool ┆ Boolean │ +╰─────────────┴─────────╯ +""" + ) + + def test_to_col_expr(): schema = TABLE.schema() schema_col_exprs = ExpressionsProjection.from_schema(schema) From adac24cee7a637866278e7f2d79376faf582e6c4 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Mon, 12 Feb 2024 18:07:03 -0800 Subject: [PATCH 7/7] [DOCS] Drop use of "Complex Data" in favor of multimodal (#1875) --- README.rst | 21 ++++++++++++------- docs/source/_static/dataframe-comp-table.csv | 2 +- docs/source/faq/dataframe_comparison.rst | 4 ++-- docs/source/index.rst | 9 +++++++- .../basic_concepts/introduction.rst | 12 +++++------ .../user_guide/daft_in_depth/datatypes.rst | 2 +- pyproject.toml | 2 +- 7 files changed, 33 insertions(+), 19 deletions(-) diff --git a/README.rst b/README.rst index 824f4a04e5..a02a13c129 100644 --- a/README.rst +++ b/README.rst @@ -4,11 +4,18 @@ `Website `_ • `Docs `_ • `Installation`_ • `10-minute tour of Daft `_ • `Community and Support `_ -Daft: the distributed Python dataframe for complex data +Daft: Distributed dataframes for multimodal data ======================================================= -`Daft `_ is a fast, Pythonic and scalable open-source dataframe library built for Python and Machine Learning workloads. +`Daft `_ is a distributed query engine for large-scale data processing in Python and is implemented in Rust. + +* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration +* **Focus on the what:** Powerful Query Optimizer that rewrites queries to be as efficient as possible +* **Data Catalog integrations:** Full integration with data catalogs such as Apache Iceberg +* **Rich multimodal type-system:** Supports multimodal types such as Images, URLs, Tensors and more +* **Seamless Interchange**: Built on the `Apache Arrow `_ In-Memory Format +* **Built for the cloud:** `Record-setting `_ I/O performance for integrations with S3 cloud storage **Table of Contents** @@ -21,11 +28,11 @@ Daft: the distributed Python dataframe for complex data About Daft ---------- -The Daft dataframe is a table of data with rows and columns. Columns can contain any Python objects, which allows Daft to support rich complex data types such as images, audio, video and more. +Daft was designed with the following principles in mind: -1. **Any Data**: Beyond the usual strings/numbers/dates, Daft columns can also hold complex multimodal data such as Images, Embeddings and Python objects. Ingestion and basic transformations of complex data is extremely easy and performant in Daft. -2. **Notebook Computing**: Daft is built for the interactive developer experience on a notebook - intelligent caching/query optimizations accelerates your experimentation and data exploration. -3. **Distributed Computing**: Rich complex formats such as images can quickly outgrow your local laptop's computational resources - Daft integrates natively with `Ray `_ for running dataframes on large clusters of machines with thousands of CPUs/GPUs. +1. **Any Data**: Beyond the usual strings/numbers/dates, Daft columns can also hold complex or nested multimodal data such as Images, Embeddings and Python objects efficiently with it's Arrow based memory representation. Ingestion and basic transformations of multimodal data is extremely easy and performant in Daft. +2. **Interactive Computing**: Daft is built for the interactive developer experience through notebooks or REPLs - intelligent caching/query optimizations accelerates your experimentation and data exploration. +3. **Distributed Computing**: Some workloads can quickly outgrow your local laptop's computational resources - Daft integrates natively with `Ray `_ for running dataframes on large clusters of machines with thousands of CPUs/GPUs. Getting Started --------------- @@ -101,7 +108,7 @@ Related Projects ---------------- +---------------------------------------------------+-----------------+---------------+-------------+-----------------+-----------------------------+-------------+ -| Dataframe | Query Optimizer | Complex Types | Distributed | Arrow Backed | Vectorized Execution Engine | Out-of-core | +| Dataframe | Query Optimizer | Multimodal | Distributed | Arrow Backed | Vectorized Execution Engine | Out-of-core | +===================================================+=================+===============+=============+=================+=============================+=============+ | Daft | Yes | Yes | Yes | Yes | Yes | Yes | +---------------------------------------------------+-----------------+---------------+-------------+-----------------+-----------------------------+-------------+ diff --git a/docs/source/_static/dataframe-comp-table.csv b/docs/source/_static/dataframe-comp-table.csv index cfcfa785d9..0a192bd006 100644 --- a/docs/source/_static/dataframe-comp-table.csv +++ b/docs/source/_static/dataframe-comp-table.csv @@ -1,4 +1,4 @@ -Dataframe,Query Optimizer,Complex Types,Distributed,Arrow Backed,Vectorized Execution Engine,Out-of-core +Dataframe,Query Optimizer,Multimodal,Distributed,Arrow Backed,Vectorized Execution Engine,Out-of-core Daft,✅,✅,✅,✅,✅,✅ `Pandas `_,❌,Python object,❌,optional >= 2.0,some(Numpy),❌ `Polars `_,✅,Python object,❌,✅,✅,✅ diff --git a/docs/source/faq/dataframe_comparison.rst b/docs/source/faq/dataframe_comparison.rst index 5a76a2125a..742aacdd25 100644 --- a/docs/source/faq/dataframe_comparison.rst +++ b/docs/source/faq/dataframe_comparison.rst @@ -24,7 +24,7 @@ Pandas/Modin The main drawback of using Pandas is scalability. Pandas is single-threaded and not built for distributed computing. While this is not as much of a problem for purely tabular datasets, when dealing with data such as images/video your data can get very large and expensive to compute very quickly. -Modin is a project that provides "distributed Pandas". If the use-case is tabular, has code that is already written in Pandas but just needs to be scaled up to larger data, Modin may be a good choice. Modin aims to be 100% Pandas API compatible which means that certain operations that are important for performance in the world of complex data such as requesting for certain amount of resources (e.g. GPUs) is not yet possible. +Modin is a project that provides "distributed Pandas". If the use-case is tabular, has code that is already written in Pandas but just needs to be scaled up to larger data, Modin may be a good choice. Modin aims to be 100% Pandas API compatible which means that certain operations that are important for performance in the world of multimodal data such as requesting for certain amount of resources (e.g. GPUs) is not yet possible. Spark Dataframes ---------------- @@ -42,7 +42,7 @@ Spark excels at large scale tabular analytics, with support for running Python c #. Unravel the flattened array again on the other end * **Debugging:** Key features such as exposing print statements or breakpoints from user-defined functions to the user are missing, which make PySpark extremely difficult to develop on. -* **Lack of granular execution control:** with heavy processing of complex data, users often need more control around the execution and scheduling of their work. For example, users may need to ensure that Spark runs a single executor per GPU, but Spark's programming model makes this very difficult. +* **Lack of granular execution control:** with heavy processing of multimodal data, users often need more control around the execution and scheduling of their work. For example, users may need to ensure that Spark runs a single executor per GPU, but Spark's programming model makes this very difficult. * **Compatibility with downstream Machine Learning tasks:** Spark itself is not well suited for performing distributed ML training which is increasingly becoming the domain of frameworks such as Ray and Horovod. Integrating with such a solution is difficult and requires expert tuning of intermediate storage and data engineering solutions. Ray Datasets diff --git a/docs/source/index.rst b/docs/source/index.rst index 1de62eb697..e0ecc7ddfe 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,7 +1,14 @@ Daft Documentation ================== -Daft is a **fast and scalable Python dataframe** for complex data and machine learning workloads. +Daft is a distributed query engine for large-scale data processing in Python and is implemented in Rust. + +* **Familiar interactive API:** Lazy Python Dataframe for rapid and interactive iteration +* **Focus on the what:** Powerful Query Optimizer that rewrites queries to be as efficient as possible +* **Data Catalog integrations:** Full integration with data catalogs such as Apache Iceberg +* **Rich multimodal type-system:** Supports multimodal types such as Images, URLs, Tensors and more +* **Seamless Interchange**: Built on the `Apache Arrow `_ In-Memory Format +* **Built for the cloud:** `Record-setting `_ I/O performance for integrations with S3 cloud storage Installing Daft --------------- diff --git a/docs/source/user_guide/basic_concepts/introduction.rst b/docs/source/user_guide/basic_concepts/introduction.rst index d01d54532b..2fa1c8fa94 100644 --- a/docs/source/user_guide/basic_concepts/introduction.rst +++ b/docs/source/user_guide/basic_concepts/introduction.rst @@ -1,12 +1,12 @@ Introduction ============ -Daft is a data processing library that has two main classes: +Daft is a distributed query engine with a DataFrame API. The two key concepts to Daft are: -1. :class:`DataFrame `: a DataFrame consisting of rows and columns of data -2. :class:`Expression `: an expression representing some (delayed) computation to execute on columns of data +1. :class:`DataFrame `: a Table-like structure that represents rows and columns of data +2. :class:`Expression `: a symbolic representation of computation that transforms columns of the DataFrame to a new one. -With Daft, you create :class:`DataFrame ` from a variety of sources (e.g. reading data from files or from Python dictionaries) and use :class:`Expression ` to manipulate data in that DataFrame. Let's take a closer look at these two abstractions! +With Daft, you create :class:`DataFrame ` from a variety of sources (e.g. reading data from files, data catalogs or from Python dictionaries) and use :class:`Expression ` to manipulate data in that DataFrame. Let's take a closer look at these two abstractions! DataFrame --------- @@ -29,8 +29,8 @@ Using this abstraction of a DataFrame, you can run common tabular operations suc Daft DataFrames are: 1. **Distributed:** your data is split into *Partitions* and can be processed in parallel/on different machines -2. **Lazy:** computations are enqueued in a query plan, and only executed when requested -3. **Complex:** columns can contain complex datatypes such as tensors, images and Python objects +2. **Lazy:** computations are enqueued in a query plan which is then optimized and executed only when requested +3. **Multimodal:** columns can contain complex datatypes such as tensors, images and Python objects Since Daft is lazy, it can actually execute the query plan on a variety of different backends. By default, it will run computations locally using Python multithreading. However if you need to scale to large amounts of data that cannot be processed on a single machine, using the Ray runner allows Daft to run computations on a `Ray `_ cluster instead. diff --git a/docs/source/user_guide/daft_in_depth/datatypes.rst b/docs/source/user_guide/daft_in_depth/datatypes.rst index 9b57b41776..f2af4ef1f0 100644 --- a/docs/source/user_guide/daft_in_depth/datatypes.rst +++ b/docs/source/user_guide/daft_in_depth/datatypes.rst @@ -70,7 +70,7 @@ See also: Nested ------ -Nested DataTypes wrap other DataTypes, allowing you to compose types into complex datastructures. +Nested DataTypes wrap other DataTypes, allowing you to compose types into complex data structures. Examples: diff --git a/pyproject.toml b/pyproject.toml index 84fdfd2d0b..3575ce769f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,7 @@ dependencies = [ "typing-extensions >= 4.0.0; python_version < '3.10'", "pickle5 >= 0.0.12; python_version < '3.8'" ] -description = "A Distributed DataFrame library for large scale complex data processing." +description = "Distributed Dataframes for Multimodal Data" dynamic = ["version"] license = {file = "LICENSE"} maintainers = [