From 6e28b3f39b38ddaa1d2696a22d733eb5c5567f44 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Thu, 7 Nov 2024 13:48:34 -0800 Subject: [PATCH] [FEAT] Native Runner (#3178) Makes swordfish a top level runner, `set_runner_native()`. Additionally sets swordfish to be the default runner for development. This PR also contains some bug fixes and test changes, of which I have left comments for. Additionally, this PR refactors swordfish in two ways: 1. Buffers scan tasks based on a `num_parallel_tasks` parameter, that takes into account any pushed down limits. 2. Adds an `is_err` check on the sender in parts of the code where we have a `while receiver.recv.await -> sender.send` pattern, such that it breaks out of the loop if the sender is dropped. This is needed in cases when the consumer is done receiving data, such as in a Limit, or if the user is doing `iter(df)` and breaks out of the iter, which will cause receivers to be dropped. As such, the senders should recognize this and drop as well. --------- Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- .github/workflows/python-package.yml | 34 ++-- Cargo.lock | 2 +- daft/context.py | 62 +++++-- daft/io/writer.py | 3 +- daft/runners/native_runner.py | 88 +++++++++ daft/runners/pyrunner.py | 7 +- daft/runners/ray_runner.py | 4 + daft/runners/runner.py | 7 +- .../user_guide/poweruser/partitioning.rst | 2 +- src/common/daft-config/Cargo.toml | 1 + src/common/daft-config/src/lib.rs | 1 + src/daft-local-execution/Cargo.toml | 1 - src/daft-local-execution/src/dispatcher.rs | 20 ++- .../src/intermediate_ops/intermediate_op.rs | 16 +- src/daft-local-execution/src/pipeline.rs | 16 +- src/daft-local-execution/src/run.rs | 4 +- .../src/sinks/streaming_sink.rs | 22 ++- .../src/sources/empty_scan.rs | 19 +- .../src/sources/in_memory.rs | 25 +-- .../src/sources/scan_task.rs | 169 ++++++++++-------- .../src/sources/source.rs | 35 ++-- src/daft-local-plan/src/plan.rs | 5 +- src/daft-local-plan/src/translate.rs | 1 + tests/actor_pool/test_actor_cuda_devices.py | 3 +- tests/benchmarks/conftest.py | 2 - tests/benchmarks/test_local_tpch.py | 13 +- tests/benchmarks/test_streaming_writes.py | 20 +-- tests/conftest.py | 2 +- tests/cookbook/conftest.py | 2 +- tests/cookbook/test_joins.py | 2 +- tests/dataframe/test_joins.py | 2 +- .../test_monotonically_increasing_id.py | 2 +- .../io/test_s3_reads_include_path.py | 42 +++-- tests/io/delta_lake/test_table_write.py | 4 +- tests/io/iceberg/test_iceberg_writes.py | 4 +- tests/io/lancedb/test_lancedb_writes.py | 2 +- tests/io/test_write_modes.py | 6 - tests/test_resource_requests.py | 3 +- 38 files changed, 425 insertions(+), 228 deletions(-) create mode 100644 daft/runners/native_runner.py diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 6e71d16138..9aac6fb73c 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -25,13 +25,10 @@ jobs: fail-fast: false matrix: python-version: ['3.8', '3.10'] - daft-runner: [py, ray] + daft-runner: [py, ray, native] pyarrow-version: [7.0.0, 16.0.0] - enable-native-executor: [0, 1] os: [ubuntu-20.04, windows-latest] exclude: - - daft-runner: ray - enable-native-executor: 1 - daft-runner: ray pyarrow-version: 7.0.0 os: ubuntu-20.04 @@ -39,6 +36,10 @@ jobs: python-version: '3.10' pyarrow-version: 7.0.0 os: ubuntu-20.04 + - daft-runner: native + python-version: '3.10' + pyarrow-version: 7.0.0 + os: ubuntu-20.04 - python-version: '3.8' pyarrow-version: 16.0.0 - os: windows-latest @@ -118,7 +119,6 @@ jobs: CARGO_TARGET_DIR: ./target DAFT_RUNNER: ${{ matrix.daft-runner }} - DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }} - name: Build library and Test with pytest (Windows) if: ${{ (runner.os == 'Windows') }} @@ -220,7 +220,7 @@ jobs: fail-fast: false matrix: python-version: ['3.8'] - daft-runner: [py, ray] + daft-runner: [py, ray, native] steps: - uses: actions/checkout@v4 with: @@ -295,7 +295,7 @@ jobs: fail-fast: false matrix: python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs - daft-runner: [py, ray] + daft-runner: [py, ray, native] steps: - uses: actions/checkout@v4 with: @@ -373,7 +373,7 @@ jobs: fail-fast: false matrix: python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs - daft-runner: [py, ray] + daft-runner: [py, ray, native] # These permissions are needed to interact with GitHub's OIDC Token endpoint. # This is used in the step "Assume GitHub Actions AWS Credentials" permissions: @@ -467,11 +467,7 @@ jobs: fail-fast: false matrix: python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs - daft-runner: [py, ray] - enable-native-executor: [0, 1] - exclude: - - daft-runner: ray - enable-native-executor: 1 + daft-runner: [py, ray, native] steps: - uses: actions/checkout@v4 with: @@ -517,7 +513,6 @@ jobs: pytest tests/integration/iceberg -m 'integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} - DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.27.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -549,11 +544,7 @@ jobs: fail-fast: false matrix: python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs - daft-runner: [py, ray] - enable-native-executor: [0, 1] - exclude: - - daft-runner: ray - enable-native-executor: 1 + daft-runner: [py, ray, native] steps: - uses: actions/checkout@v4 with: @@ -598,7 +589,6 @@ jobs: pytest tests/integration/sql -m 'integration or not integration' --durations=50 env: DAFT_RUNNER: ${{ matrix.daft-runner }} - DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }} - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.27.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -681,6 +671,8 @@ jobs: uses: CodSpeedHQ/action@v3 with: run: pytest tests/benchmarks -m benchmark --codspeed + env: + DAFT_RUNNER: native - name: Send Slack notification on failure uses: slackapi/slack-github-action@v1.27.0 if: ${{ failure() && (github.ref == 'refs/heads/main') }} @@ -810,6 +802,8 @@ jobs: source activate maturin develop pytest --doctest-modules --continue-on-collection-errors daft/dataframe/dataframe.py daft/expressions/expressions.py daft/convert.py daft/udf.py + env: + DAFT_RUNNER: py publish-coverage-reports: name: Publish coverage reports to CodeCov diff --git a/Cargo.lock b/Cargo.lock index d964ae700f..46d304e3f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1386,6 +1386,7 @@ version = "0.3.0-dev0" dependencies = [ "common-io-config", "common-py-serde", + "log", "pyo3", "serde", ] @@ -2118,7 +2119,6 @@ dependencies = [ "pyo3", "snafu", "tokio", - "tokio-stream", "tracing", ] diff --git a/daft/context.py b/daft/context.py index a7f1948bf3..780ef02ca5 100644 --- a/daft/context.py +++ b/daft/context.py @@ -7,6 +7,7 @@ import warnings from typing import TYPE_CHECKING, ClassVar +from daft import get_build_type from daft.daft import IOConfig, PyDaftExecutionConfig, PyDaftPlanningConfig if TYPE_CHECKING: @@ -27,6 +28,11 @@ class _PyRunnerConfig(_RunnerConfig): use_thread_pool: bool | None +@dataclasses.dataclass(frozen=True) +class _NativeRunnerConfig(_RunnerConfig): + name = "native" + + @dataclasses.dataclass(frozen=True) class _RayRunnerConfig(_RunnerConfig): name = "ray" @@ -42,6 +48,7 @@ def _get_runner_config_from_env() -> _RunnerConfig: 1. PyRunner: set DAFT_RUNNER=py 2. RayRunner: set DAFT_RUNNER=ray and optionally RAY_ADDRESS=ray://... + 3. NativeRunner: set DAFT_RUNNER=native """ runner_from_envvar = os.getenv("DAFT_RUNNER") @@ -90,6 +97,8 @@ def _get_runner_config_from_env() -> _RunnerConfig: ) elif runner_from_envvar and runner_from_envvar.upper() == "PY": return _PyRunnerConfig(use_thread_pool=use_thread_pool) + elif runner_from_envvar and runner_from_envvar.upper() == "NATIVE": + return _NativeRunnerConfig() elif runner_from_envvar is not None: raise ValueError(f"Unsupported DAFT_RUNNER variable: {runner_from_envvar}") @@ -101,6 +110,10 @@ def _get_runner_config_from_env() -> _RunnerConfig: force_client_mode=ray_force_client_mode, ) + # Use native runner if in dev mode + elif get_build_type() == "dev": + return _NativeRunnerConfig() + # Fall back on PyRunner else: return _PyRunnerConfig(use_thread_pool=use_thread_pool) @@ -118,7 +131,6 @@ class DaftContext: _daft_planning_config: PyDaftPlanningConfig = PyDaftPlanningConfig.from_env() _runner_config: _RunnerConfig | None = None - _disallow_set_runner: bool = False _runner: Runner | None = None _instance: ClassVar[DaftContext | None] = None @@ -178,14 +190,15 @@ def _get_runner(self) -> Runner: assert isinstance(runner_config, _PyRunnerConfig) self._runner = PyRunner(use_thread_pool=runner_config.use_thread_pool) + elif runner_config.name == "native": + from daft.runners.native_runner import NativeRunner + + assert isinstance(runner_config, _NativeRunnerConfig) + self._runner = NativeRunner() else: raise NotImplementedError(f"Runner config not implemented: {runner_config.name}") - # Mark DaftContext as having the runner set, which prevents any subsequent setting of the config - # after the runner has been initialized once - self._disallow_set_runner = True - return self._runner @property @@ -194,6 +207,17 @@ def is_ray_runner(self) -> bool: runner_config = self._get_runner_config() return isinstance(runner_config, _RayRunnerConfig) + def can_set_runner(self, new_runner_name: str) -> bool: + # If the runner has not been set yet, we can set it + if self._runner_config is None: + return True + # If the runner has been set to the ray runner, we can't set it again + elif self._runner_config.name == "ray": + return False + # If the runner has been set to a local runner, we can set it to a new local runner + else: + return new_runner_name in {"py", "native"} + _DaftContext = DaftContext() @@ -229,7 +253,7 @@ def set_runner_ray( ctx = get_context() with ctx._lock: - if ctx._disallow_set_runner: + if not ctx.can_set_runner("ray"): if noop_if_initialized: warnings.warn( "Calling daft.context.set_runner_ray(noop_if_initialized=True) multiple times has no effect beyond the first call." @@ -242,7 +266,7 @@ def set_runner_ray( max_task_backlog=max_task_backlog, force_client_mode=force_client_mode, ) - ctx._disallow_set_runner = True + ctx._runner = None return ctx @@ -256,11 +280,29 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: """ ctx = get_context() with ctx._lock: - if ctx._disallow_set_runner: + if not ctx.can_set_runner("py"): raise RuntimeError("Cannot set runner more than once") ctx._runner_config = _PyRunnerConfig(use_thread_pool=use_thread_pool) - ctx._disallow_set_runner = True + ctx._runner = None + return ctx + + +def set_runner_native() -> DaftContext: + """Set the runner for executing Daft dataframes to the native runner. + + Alternatively, users can set this behavior via an environment variable: DAFT_RUNNER=native + + Returns: + DaftContext: Daft context after setting the native runner + """ + ctx = get_context() + with ctx._lock: + if not ctx.can_set_runner("native"): + raise RuntimeError("Cannot set runner more than once") + + ctx._runner_config = _NativeRunnerConfig() + ctx._runner = None return ctx @@ -365,7 +407,7 @@ def set_execution_config( shuffle_aggregation_default_partitions: Maximum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200. read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB enable_aqe: Enables Adaptive Query Execution, Defaults to False - enable_native_executor: Enables new local executor. Defaults to False + enable_native_executor: Enables the native executor, Defaults to False default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows. """ # Replace values in the DaftExecutionConfig with user-specified overrides diff --git a/daft/io/writer.py b/daft/io/writer.py index 70a9ecda12..f3e94adf8b 100644 --- a/daft/io/writer.py +++ b/daft/io/writer.py @@ -175,8 +175,9 @@ def __init__( self.is_closed = False def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter: + output_file = self.fs.open_output_stream(self.full_path) return pacsv.CSVWriter( - self.full_path, + output_file, schema, ) diff --git a/daft/runners/native_runner.py b/daft/runners/native_runner.py new file mode 100644 index 0000000000..0466a00dde --- /dev/null +++ b/daft/runners/native_runner.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Iterator + +from daft.context import get_context +from daft.daft import FileFormatConfig, FileInfos, IOConfig +from daft.execution.native_executor import NativeExecutor +from daft.filesystem import glob_path_with_stats +from daft.runners import runner_io +from daft.runners.partitioning import ( + LocalMaterializedResult, + LocalPartitionSet, + PartitionCacheEntry, + PartitionSetCache, +) +from daft.runners.runner import LOCAL_PARTITION_SET_CACHE, Runner +from daft.table import MicroPartition + +if TYPE_CHECKING: + from daft.logical.builder import LogicalPlanBuilder + +logger = logging.getLogger(__name__) + + +class NativeRunnerIO(runner_io.RunnerIO): + def glob_paths_details( + self, + source_paths: list[str], + file_format_config: FileFormatConfig | None = None, + io_config: IOConfig | None = None, + ) -> FileInfos: + file_infos = FileInfos() + file_format = file_format_config.file_format() if file_format_config is not None else None + for source_path in source_paths: + path_file_infos = glob_path_with_stats(source_path, file_format, io_config) + + if len(path_file_infos) == 0: + raise FileNotFoundError(f"No files found at {source_path}") + + file_infos.extend(path_file_infos) + + return file_infos + + +class NativeRunner(Runner[MicroPartition]): + def __init__(self) -> None: + super().__init__() + + def initialize_partition_set_cache(self) -> PartitionSetCache: + return LOCAL_PARTITION_SET_CACHE + + def runner_io(self) -> NativeRunnerIO: + return NativeRunnerIO() + + def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry: + results = list(self.run_iter(builder)) + + result_pset = LocalPartitionSet() + for i, result in enumerate(results): + result_pset.set_partition(i, result) + + pset_entry = self.put_partition_set_into_cache(result_pset) + return pset_entry + + def run_iter( + self, + builder: LogicalPlanBuilder, + results_buffer_size: int | None = None, + ) -> Iterator[LocalMaterializedResult]: + # NOTE: Freeze and use this same execution config for the entire execution + daft_execution_config = get_context().daft_execution_config + + # Optimize the logical plan. + builder = builder.optimize() + executor = NativeExecutor.from_logical_plan_builder(builder) + results_gen = executor.run( + {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}, + daft_execution_config, + results_buffer_size, + ) + yield from results_gen + + def run_iter_tables( + self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None + ) -> Iterator[MicroPartition]: + for result in self.run_iter(builder, results_buffer_size=results_buffer_size): + yield result.partition() diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 4934d481e7..46cd587ba5 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -23,10 +23,11 @@ PartialPartitionMetadata, PartitionCacheEntry, PartitionMetadata, + PartitionSetCache, ) from daft.runners.profiler import profiler from daft.runners.progress_bar import ProgressBar -from daft.runners.runner import Runner +from daft.runners.runner import LOCAL_PARTITION_SET_CACHE, Runner from daft.table import MicroPartition if TYPE_CHECKING: @@ -316,6 +317,9 @@ def __init__(self, use_thread_pool: bool | None) -> None: memory_bytes, ) + def initialize_partition_set_cache(self) -> PartitionSetCache: + return LOCAL_PARTITION_SET_CACHE + def runner_io(self) -> PyRunnerIO: return PyRunnerIO() @@ -369,6 +373,7 @@ def run_iter( # physical plan to executable tasks. if daft_execution_config.enable_native_executor: logger.info("Using native executor") + executor = NativeExecutor.from_logical_plan_builder(builder) results_gen = executor.run( {k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()}, diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 1d6eeacd6f..cadc434254 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -62,6 +62,7 @@ PartitionCacheEntry, PartitionMetadata, PartitionSet, + PartitionSetCache, ) from daft.runners.profiler import profiler from daft.runners.runner import Runner @@ -1152,6 +1153,9 @@ def __init__( use_ray_tqdm=False, ) + def initialize_partition_set_cache(self) -> PartitionSetCache: + return PartitionSetCache() + def active_plans(self) -> list[str]: if self.ray_client_mode: return ray.get(self.scheduler_actor.active_plans.remote()) diff --git a/daft/runners/runner.py b/daft/runners/runner.py index 33ed51a69f..2dfb71ed5e 100644 --- a/daft/runners/runner.py +++ b/daft/runners/runner.py @@ -16,10 +16,12 @@ from daft.runners.runner_io import RunnerIO from daft.table import MicroPartition +LOCAL_PARTITION_SET_CACHE = PartitionSetCache() + class Runner(Generic[PartitionT]): def __init__(self) -> None: - self._part_set_cache = PartitionSetCache() + self._part_set_cache = self.initialize_partition_set_cache() def get_partition_set_from_cache(self, pset_id: str) -> PartitionCacheEntry: return self._part_set_cache.get_partition_set(pset_id=pset_id) @@ -27,6 +29,9 @@ def get_partition_set_from_cache(self, pset_id: str) -> PartitionCacheEntry: def put_partition_set_into_cache(self, pset: PartitionSet) -> PartitionCacheEntry: return self._part_set_cache.put_partition_set(pset=pset) + @abstractmethod + def initialize_partition_set_cache(self) -> PartitionSetCache: ... + @abstractmethod def runner_io(self) -> RunnerIO: ... diff --git a/docs/source/user_guide/poweruser/partitioning.rst b/docs/source/user_guide/poweruser/partitioning.rst index 24339def99..443f86636f 100644 --- a/docs/source/user_guide/poweruser/partitioning.rst +++ b/docs/source/user_guide/poweruser/partitioning.rst @@ -19,7 +19,7 @@ Daft refers to this as a "clustering specification", and you are able to see thi controlling parallelism and how much data is being materialized at a time. However, Daft's new experimental execution engine will remove the concept of partitioning entirely for local execution. - You may enable it with ``DAFT_ENABLE_NATIVE_EXECUTOR=1``. Instead of using partitioning to control parallelism, + You may enable it with ``DAFT_RUNNER=native``. Instead of using partitioning to control parallelism, this new execution engine performs a streaming-based execution on small "morsels" of data, which provides much more stable memory utilization while improving the user experience with not having to worry about partitioning. diff --git a/src/common/daft-config/Cargo.toml b/src/common/daft-config/Cargo.toml index 212ce37ab2..c37dfc8ecb 100644 --- a/src/common/daft-config/Cargo.toml +++ b/src/common/daft-config/Cargo.toml @@ -1,6 +1,7 @@ [dependencies] common-io-config = {path = "../io-config", default-features = false} common-py-serde = {path = "../py-serde", default-features = false} +log = {workspace = true} pyo3 = {workspace = true, optional = true} serde = {workspace = true} diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs index 2202b20d39..69dcf80a5e 100644 --- a/src/common/daft-config/src/lib.rs +++ b/src/common/daft-config/src/lib.rs @@ -98,6 +98,7 @@ impl DaftExecutionConfig { if let Ok(val) = std::env::var(exec_env_var_name) && matches!(val.trim().to_lowercase().as_str(), "1" | "true") { + log::warn!("DAFT_ENABLE_NATIVE_EXECUTOR will be deprecated and removed in the future. Please switch to using DAFT_RUNNER=NATIVE instead."); cfg.enable_native_executor = true; } cfg diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index cab1bc821b..b81cc2f2ce 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -28,7 +28,6 @@ num-format = "0.4.4" pyo3 = {workspace = true, optional = true} snafu = {workspace = true} tokio = {workspace = true} -tokio-stream = {workspace = true} tracing = {workspace = true} [features] diff --git a/src/daft-local-execution/src/dispatcher.rs b/src/daft-local-execution/src/dispatcher.rs index d21fc306b6..08aeceb2eb 100644 --- a/src/daft-local-execution/src/dispatcher.rs +++ b/src/daft-local-execution/src/dispatcher.rs @@ -46,13 +46,17 @@ impl Dispatcher for RoundRobinBufferedDispatcher { while let Some(morsel) = receiver.recv().await { if morsel.should_broadcast() { for worker_sender in &worker_senders { - let _ = worker_sender.send(morsel.clone()).await; + if worker_sender.send(morsel.clone()).await.is_err() { + return Ok(()); + } } } else { buffer.push(morsel.as_data()); if let Some(ready) = buffer.pop_enough()? { for r in ready { - let _ = send_to_next_worker(r.into()).await; + if send_to_next_worker(r.into()).await.is_err() { + return Ok(()); + } } } } @@ -85,7 +89,9 @@ impl Dispatcher for PartitionedDispatcher { while let Some(morsel) = receiver.recv().await { if morsel.should_broadcast() { for worker_sender in &worker_senders { - let _ = worker_sender.send(morsel.clone()).await; + if worker_sender.send(morsel.clone()).await.is_err() { + return Ok(()); + } } } else { let partitions = morsel @@ -93,7 +99,13 @@ impl Dispatcher for PartitionedDispatcher { .partition_by_hash(&self.partition_by, worker_senders.len())?; for (partition, worker_sender) in partitions.into_iter().zip(worker_senders.iter()) { - let _ = worker_sender.send(Arc::new(partition).into()).await; + if worker_sender + .send(Arc::new(partition).into()) + .await + .is_err() + { + return Ok(()); + } } } } diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index d268120c06..65b3455f57 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -132,14 +132,18 @@ impl IntermediateNode { let result = compute_runtime.spawn(fut).await??; match result { IntermediateOperatorResult::NeedMoreInput(Some(mp)) => { - let _ = sender.send(mp.into()).await; + if sender.send(mp.into()).await.is_err() { + return Ok(()); + } break; } IntermediateOperatorResult::NeedMoreInput(None) => { break; } IntermediateOperatorResult::HasMoreOutput(mp) => { - let _ = sender.send(mp.into()).await; + if sender.send(mp.into()).await.is_err() { + return Ok(()); + } } } } @@ -189,13 +193,17 @@ impl IntermediateNode { while let Some(morsel) = receiver.recv().await { if morsel.should_broadcast() { for worker_sender in &worker_senders { - let _ = worker_sender.send((idx, morsel.clone())).await; + if worker_sender.send((idx, morsel.clone())).await.is_err() { + return Ok(()); + } } } else { buffer.push(morsel.as_data()); if let Some(ready) = buffer.pop_enough()? { for part in ready { - let _ = send_to_next_worker(idx, part.into()).await; + if send_to_next_worker(idx, part.into()).await.is_err() { + return Ok(()); + } } } } diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index b23fd8d509..771efd1632 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -121,16 +121,22 @@ pub fn physical_plan_to_pipeline( let out: Box = match physical_plan { LocalPhysicalPlan::EmptyScan(EmptyScan { schema, .. }) => { let source = EmptyScanSource::new(schema.clone()); - source.boxed().into() + source.arced().into() } - LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => { - let scan_task_source = ScanTaskSource::new(scan_tasks.clone()); - scan_task_source.boxed().into() + LocalPhysicalPlan::PhysicalScan(PhysicalScan { + scan_tasks, + pushdowns, + schema, + .. + }) => { + let scan_task_source = + ScanTaskSource::new(scan_tasks.clone(), pushdowns.clone(), schema.clone(), cfg); + scan_task_source.arced().into() } LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { let partitions = psets.get(&info.cache_key).expect("Cache key not found"); InMemorySource::new(partitions.clone(), info.source_schema.clone()) - .boxed() + .arced() .into() } LocalPhysicalPlan::Project(Project { diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 182c2091a5..0f01ec61e6 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -130,7 +130,9 @@ pub fn run_local( let mut receiver = pipeline.start(true, &mut runtime_handle)?.get_receiver(); while let Some(val) = receiver.recv().await { - let _ = tx.send(val.as_data().clone()).await; + if tx.send(val.as_data().clone()).await.is_err() { + break; + } } while let Some(result) = runtime_handle.join_next().await { diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index 9ebab6f1a3..911b52d652 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -129,12 +129,18 @@ impl StreamingSinkNode { match result { StreamingSinkOutput::NeedMoreInput(mp) => { if let Some(mp) = mp { - let _ = output_sender.send(mp).await; + if output_sender.send(mp).await.is_err() { + finished = true; + break; + } } break; } StreamingSinkOutput::HasMoreOutput(mp) => { - let _ = output_sender.send(mp).await; + if output_sender.send(mp).await.is_err() { + finished = true; + break; + } } StreamingSinkOutput::Finished(mp) => { if let Some(mp) = mp { @@ -191,10 +197,12 @@ impl StreamingSinkNode { while let Some(morsel) = receiver.recv().await { if morsel.should_broadcast() { for worker_sender in &worker_senders { - let _ = worker_sender.send((idx, morsel.clone())).await; + if worker_sender.send((idx, morsel.clone())).await.is_err() { + return Ok(()); + } } - } else { - let _ = send_to_next_worker(idx, morsel.clone()).await; + } else if send_to_next_worker(idx, morsel.clone()).await.is_err() { + return Ok(()); } } } @@ -270,7 +278,9 @@ impl PipelineNode for StreamingSinkNode { ); while let Some(morsel) = output_receiver.recv().await { - let _ = destination_sender.send(morsel.into()).await; + if destination_sender.send(morsel.into()).await.is_err() { + break; + } } let mut finished_states = Vec::with_capacity(num_workers); diff --git a/src/daft-local-execution/src/sources/empty_scan.rs b/src/daft-local-execution/src/sources/empty_scan.rs index 1003902662..785ce9fc17 100644 --- a/src/daft-local-execution/src/sources/empty_scan.rs +++ b/src/daft-local-execution/src/sources/empty_scan.rs @@ -1,12 +1,14 @@ use std::sync::Arc; +use async_trait::async_trait; +use common_error::DaftResult; use daft_core::prelude::SchemaRef; use daft_io::IOStatsRef; use daft_micropartition::MicroPartition; use tracing::instrument; use super::source::Source; -use crate::{sources::source::SourceStream, ExecutionRuntimeHandle}; +use crate::sources::source::SourceStream; pub struct EmptyScanSource { schema: SchemaRef, @@ -16,23 +18,26 @@ impl EmptyScanSource { pub fn new(schema: SchemaRef) -> Self { Self { schema } } - pub fn boxed(self) -> Box { - Box::new(self) as Box + pub fn arced(self) -> Arc { + Arc::new(self) as Arc } } +#[async_trait] impl Source for EmptyScanSource { #[instrument(name = "EmptyScanSource::get_data", level = "info", skip_all)] - fn get_data( + async fn get_data( &self, _maintain_order: bool, - _runtime_handle: &mut ExecutionRuntimeHandle, _io_stats: IOStatsRef, - ) -> crate::Result> { + ) -> DaftResult> { let empty = Arc::new(MicroPartition::empty(Some(self.schema.clone()))); - Ok(Box::pin(futures::stream::once(async { empty }))) + Ok(Box::pin(futures::stream::once(async { Ok(empty) }))) } fn name(&self) -> &'static str { "EmptyScanSource" } + fn schema(&self) -> &SchemaRef { + &self.schema + } } diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index 1bf08a8913..c27547b80f 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -1,12 +1,14 @@ use std::sync::Arc; +use async_trait::async_trait; +use common_error::DaftResult; use daft_core::prelude::SchemaRef; use daft_io::IOStatsRef; use daft_micropartition::MicroPartition; use tracing::instrument; use super::source::Source; -use crate::{sources::source::SourceStream, ExecutionRuntimeHandle}; +use crate::sources::source::SourceStream; pub struct InMemorySource { data: Vec>, @@ -17,26 +19,27 @@ impl InMemorySource { pub fn new(data: Vec>, schema: SchemaRef) -> Self { Self { data, schema } } - pub fn boxed(self) -> Box { - Box::new(self) as Box + pub fn arced(self) -> Arc { + Arc::new(self) as Arc } } +#[async_trait] impl Source for InMemorySource { #[instrument(name = "InMemorySource::get_data", level = "info", skip_all)] - fn get_data( + async fn get_data( &self, _maintain_order: bool, - _runtime_handle: &mut ExecutionRuntimeHandle, _io_stats: IOStatsRef, - ) -> crate::Result> { - if self.data.is_empty() { - let empty = Arc::new(MicroPartition::empty(Some(self.schema.clone()))); - return Ok(Box::pin(futures::stream::once(async { empty }))); - } - Ok(Box::pin(futures::stream::iter(self.data.clone()))) + ) -> DaftResult> { + Ok(Box::pin(futures::stream::iter( + self.data.clone().into_iter().map(Ok), + ))) } fn name(&self) -> &'static str { "InMemory" } + fn schema(&self) -> &SchemaRef { + &self.schema + } } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index 1bd4bac0d7..93cea42e87 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -3,115 +3,128 @@ use std::{ sync::Arc, }; +use async_trait::async_trait; +use common_daft_config::DaftExecutionConfig; use common_error::DaftResult; use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; -use daft_core::prelude::{AsArrow, Int64Array, Utf8Array}; +use common_runtime::get_io_runtime; +use daft_core::prelude::{AsArrow, Int64Array, SchemaRef, Utf8Array}; use daft_csv::{CsvConvertOptions, CsvParseOptions, CsvReadOptions}; use daft_io::IOStatsRef; use daft_json::{JsonConvertOptions, JsonParseOptions, JsonReadOptions}; use daft_micropartition::MicroPartition; use daft_parquet::read::{read_parquet_bulk_async, ParquetSchemaInferenceOptions}; -use daft_scan::{storage_config::StorageConfig, ChunkSpec, ScanTask}; -use futures::{Stream, StreamExt}; +use daft_scan::{storage_config::StorageConfig, ChunkSpec, Pushdowns, ScanTask}; +use futures::{Stream, StreamExt, TryStreamExt}; use snafu::ResultExt; -use tokio_stream::wrappers::ReceiverStream; use tracing::instrument; use crate::{ - channel::{create_channel, Sender}, sources::source::{Source, SourceStream}, - ExecutionRuntimeHandle, JoinSnafu, TaskSet, NUM_CPUS, + NUM_CPUS, }; pub struct ScanTaskSource { scan_tasks: Vec>, + num_parallel_tasks: usize, + schema: SchemaRef, } impl ScanTaskSource { - pub fn new(scan_tasks: Vec>) -> Self { - Self { scan_tasks } - } + pub fn new( + scan_tasks: Vec>, + pushdowns: Pushdowns, + schema: SchemaRef, + cfg: &DaftExecutionConfig, + ) -> Self { + // Determine the number of parallel tasks to run based on available CPU cores and row limits + let mut num_parallel_tasks = match pushdowns.limit { + // If we have a row limit, we need to calculate how many parallel tasks we can run + // without exceeding the limit + Some(limit) => { + let mut count = 0; + let mut remaining_rows = limit as f64; - #[instrument( - name = "ScanTaskSource::process_scan_task_stream", - level = "info", - skip_all - )] - async fn process_scan_task_stream( - scan_task: Arc, - sender: Sender>, - maintain_order: bool, - io_stats: IOStatsRef, - delete_map: Option>>>, - ) -> DaftResult<()> { - let schema = scan_task.materialized_schema(); - let mut stream = - stream_scan_task(scan_task, Some(io_stats), delete_map, maintain_order).await?; - let mut has_data = false; - while let Some(partition) = stream.next().await { - let _ = sender.send(partition?).await; - has_data = true; - } - if !has_data { - let empty = Arc::new(MicroPartition::empty(Some(schema.clone()))); - let _ = sender.send(empty).await; + // Only examine tasks up to the number of available CPU cores + for scan_task in scan_tasks.iter().take(*NUM_CPUS) { + match scan_task.approx_num_rows(Some(cfg)) { + // If we can estimate the number of rows for this task + Some(estimated_rows) => { + remaining_rows -= estimated_rows; + count += 1; + + // Stop adding tasks if we would exceed the row limit + if remaining_rows <= 0.0 { + break; + } + } + // If we can't estimate rows, conservatively include the task + // This ensures we don't underutilize available resources + None => count += 1, + } + } + count + } + // If there's no row limit, use all available CPU cores + None => *NUM_CPUS, + }; + num_parallel_tasks = num_parallel_tasks.min(scan_tasks.len()); + Self { + scan_tasks, + num_parallel_tasks, + schema, } - Ok(()) } - pub fn boxed(self) -> Box { - Box::new(self) as Box + + pub fn arced(self) -> Arc { + Arc::new(self) as Arc } } + +#[async_trait] impl Source for ScanTaskSource { #[instrument(name = "ScanTaskSource::get_data", level = "info", skip_all)] - fn get_data( + async fn get_data( &self, maintain_order: bool, - runtime_handle: &mut ExecutionRuntimeHandle, io_stats: IOStatsRef, - ) -> crate::Result> { - let (senders, receivers): (Vec<_>, Vec<_>) = if maintain_order { - (0..self.scan_tasks.len()) - .map(|_| create_channel(1)) - .unzip() - } else { - let (sender, receiver) = create_channel(self.scan_tasks.len()); - ( - std::iter::repeat(sender) - .take(self.scan_tasks.len()) - .collect(), - vec![receiver], - ) - }; - let scan_tasks = self.scan_tasks.clone(); - runtime_handle.spawn( - async move { - let mut task_set = TaskSet::new(); - let delete_map = get_delete_map(&scan_tasks).await?.map(Arc::new); - for (scan_task, sender) in scan_tasks.into_iter().zip(senders) { - task_set.spawn(Self::process_scan_task_stream( - scan_task, - sender, - maintain_order, - io_stats.clone(), - delete_map.clone(), - )); - } - while let Some(result) = task_set.join_next().await { - result.context(JoinSnafu)??; - } - Ok(()) - }, - self.name(), - ); + ) -> DaftResult> { + let io_runtime = get_io_runtime(true); + let delete_map = get_delete_map(&self.scan_tasks).await?.map(Arc::new); + let stream_of_streams = + futures::stream::iter(self.scan_tasks.clone().into_iter().map(move |scan_task| { + let io_stats = io_stats.clone(); + let delete_map = delete_map.clone(); + io_runtime.spawn(async move { + stream_scan_task(scan_task, io_stats, delete_map, maintain_order).await + }) + })); - let stream = futures::stream::iter(receivers.into_iter().map(ReceiverStream::new)); - Ok(Box::pin(stream.flatten())) + match maintain_order { + true => { + let buffered_and_flattened = stream_of_streams + .buffered(self.num_parallel_tasks) + .map(|r| r?) + .try_flatten(); + Ok(Box::pin(buffered_and_flattened)) + } + false => { + let buffered_and_flattened = stream_of_streams + .buffer_unordered(self.num_parallel_tasks) + .map(|r| r?) + .try_flatten_unordered(None); + Ok(Box::pin(buffered_and_flattened)) + } + } } fn name(&self) -> &'static str { "ScanTask" } + + fn schema(&self) -> &SchemaRef { + &self.schema + } } // Read all iceberg delete files and return a map of file paths to delete positions @@ -188,7 +201,7 @@ async fn get_delete_map( async fn stream_scan_task( scan_task: Arc, - io_stats: Option, + io_stats: IOStatsRef, delete_map: Option>>>, maintain_order: bool, ) -> DaftResult>> + Send> { @@ -266,7 +279,7 @@ async fn stream_scan_task( row_groups, scan_task.pushdowns.filters.clone(), io_client, - io_stats, + Some(io_stats), &inference_options, field_id_mapping.clone(), metadata, @@ -315,7 +328,7 @@ async fn stream_scan_task( Some(parse_options), Some(read_options), io_client, - io_stats.clone(), + Some(io_stats.clone()), None, // maintain_order, TODO: Implement maintain_order for CSV ) @@ -340,7 +353,7 @@ async fn stream_scan_task( Some(parse_options), Some(read_options), io_client, - io_stats, + Some(io_stats), None, // maintain_order, TODO: Implement maintain_order for JSON ) diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 8c55401db2..0073d18c82 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -1,6 +1,9 @@ use std::sync::Arc; +use async_trait::async_trait; use common_display::{tree::TreeDisplay, utils::bytes_to_human_readable}; +use common_error::DaftResult; +use daft_core::prelude::SchemaRef; use daft_io::{IOStatsContext, IOStatsRef}; use daft_micropartition::MicroPartition; use futures::{stream::BoxStream, StreamExt}; @@ -10,20 +13,21 @@ use crate::{ ExecutionRuntimeHandle, }; -pub type SourceStream<'a> = BoxStream<'a, Arc>; +pub type SourceStream<'a> = BoxStream<'a, DaftResult>>; +#[async_trait] pub trait Source: Send + Sync { fn name(&self) -> &'static str; - fn get_data( + async fn get_data( &self, maintain_order: bool, - runtime_handle: &mut ExecutionRuntimeHandle, io_stats: IOStatsRef, - ) -> crate::Result>; + ) -> DaftResult>; + fn schema(&self) -> &SchemaRef; } struct SourceNode { - source: Box, + source: Arc, runtime_stats: Arc, io_stats: IOStatsRef, } @@ -70,16 +74,23 @@ impl PipelineNode for SourceNode { maintain_order: bool, runtime_handle: &mut ExecutionRuntimeHandle, ) -> crate::Result { - let mut source_stream = - self.source - .get_data(maintain_order, runtime_handle, self.io_stats.clone())?; - + let source = self.source.clone(); + let io_stats = self.io_stats.clone(); let mut channel = PipelineChannel::new(1, maintain_order); let counting_sender = channel.get_next_sender_with_stats(&self.runtime_stats); runtime_handle.spawn( async move { + let mut has_data = false; + let mut source_stream = source.get_data(maintain_order, io_stats).await?; while let Some(part) = source_stream.next().await { - let _ = counting_sender.send(part.into()).await; + has_data = true; + if counting_sender.send(part?.into()).await.is_err() { + return Ok(()); + } + } + if !has_data { + let empty = Arc::new(MicroPartition::empty(Some(source.schema().clone()))); + let _ = counting_sender.send(empty.into()).await; } Ok(()) }, @@ -92,8 +103,8 @@ impl PipelineNode for SourceNode { } } -impl From> for Box { - fn from(source: Box) -> Self { +impl From> for Box { + fn from(source: Arc) -> Self { let name = source.name(); Box::new(SourceNode { source, diff --git a/src/daft-local-plan/src/plan.rs b/src/daft-local-plan/src/plan.rs index ed4e5be46c..1a20d4ba86 100644 --- a/src/daft-local-plan/src/plan.rs +++ b/src/daft-local-plan/src/plan.rs @@ -4,7 +4,7 @@ use common_resource_request::ResourceRequest; use daft_core::prelude::*; use daft_dsl::{AggExpr, ExprRef}; use daft_logical_plan::{InMemoryInfo, OutputFileInfo}; -use daft_scan::{ScanTask, ScanTaskRef}; +use daft_scan::{Pushdowns, ScanTask, ScanTaskRef}; pub type LocalPhysicalPlanRef = Arc; #[derive(Debug, strum::IntoStaticStr)] @@ -66,10 +66,12 @@ impl LocalPhysicalPlan { pub(crate) fn physical_scan( scan_tasks: Vec, + pushdowns: Pushdowns, schema: SchemaRef, ) -> LocalPhysicalPlanRef { Self::PhysicalScan(PhysicalScan { scan_tasks, + pushdowns, schema, plan_stats: PlanStats {}, }) @@ -354,6 +356,7 @@ pub struct InMemoryScan { #[derive(Debug)] pub struct PhysicalScan { pub scan_tasks: Vec, + pub pushdowns: Pushdowns, pub schema: SchemaRef, pub plan_stats: PlanStats, } diff --git a/src/daft-local-plan/src/translate.rs b/src/daft-local-plan/src/translate.rs index 1106fffaeb..606b68e52e 100644 --- a/src/daft-local-plan/src/translate.rs +++ b/src/daft-local-plan/src/translate.rs @@ -19,6 +19,7 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult { } else { Ok(LocalPhysicalPlan::physical_scan( scan_tasks, + info.pushdowns.clone(), source.output_schema.clone(), )) } diff --git a/tests/actor_pool/test_actor_cuda_devices.py b/tests/actor_pool/test_actor_cuda_devices.py index 4f8bf6c410..4da1f15fa1 100644 --- a/tests/actor_pool/test_actor_cuda_devices.py +++ b/tests/actor_pool/test_actor_cuda_devices.py @@ -13,8 +13,7 @@ from daft.udf import udf pytestmark = pytest.mark.skipif( - get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", + get_context().runner_config.name == "native", reason="Native runner does not support GPU tests yet" ) diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py index 3c49a2733f..0c46545acd 100644 --- a/tests/benchmarks/conftest.py +++ b/tests/benchmarks/conftest.py @@ -91,10 +91,8 @@ def gen_tpch(request): def get_df(gen_tpch, request): (csv_files_location, parquet_files_location, in_memory_tables, num_parts), _ = gen_tpch source_type = request.param - print(f"Source Type: {source_type}") def _get_df(tbl_name: str): - print(f"Table Name: {tbl_name}, Source Type: {source_type}") if source_type == "csv": local_fs = LocalFileSystem() nonchunked_filepath = f"{csv_files_location}/{tbl_name}.tbl" diff --git a/tests/benchmarks/test_local_tpch.py b/tests/benchmarks/test_local_tpch.py index eed56f4b0e..5cff319f44 100644 --- a/tests/benchmarks/test_local_tpch.py +++ b/tests/benchmarks/test_local_tpch.py @@ -22,7 +22,7 @@ @pytest.mark.skipif( - daft.context.get_context().runner_config.name not in {"py"}, + daft.context.get_context().runner_config.name not in {"py", "native"}, reason="requires PyRunner to be in use", ) @pytest.mark.benchmark(group="tpch") @@ -32,16 +32,15 @@ def test_tpch(tmp_path, check_answer, get_df, benchmark_with_memray, engine, q): def f(): if engine == "native": - ctx = daft.context.execution_config_ctx(enable_native_executor=True) + daft.context.set_runner_native() elif engine == "python": - ctx = daft.context.execution_config_ctx(enable_native_executor=False) + daft.context.set_runner_py() else: raise ValueError(f"{engine} unsupported") - with ctx: - question = getattr(answers, f"q{q}") - daft_df = question(get_df) - return daft_df.to_arrow() + question = getattr(answers, f"q{q}") + daft_df = question(get_df) + return daft_df.to_arrow() benchmark_group = f"q{q}-parts-{num_parts}" daft_pd_df = benchmark_with_memray(f, benchmark_group).to_pandas() diff --git a/tests/benchmarks/test_streaming_writes.py b/tests/benchmarks/test_streaming_writes.py index eaab068101..cb44d9f902 100644 --- a/tests/benchmarks/test_streaming_writes.py +++ b/tests/benchmarks/test_streaming_writes.py @@ -38,22 +38,18 @@ def test_streaming_write( def f(): if engine == "native": - ctx = daft.context.execution_config_ctx( - enable_native_executor=True, - parquet_target_filesize=target_file_size, - parquet_target_row_group_size=target_row_group_size, - csv_target_filesize=target_file_size, - ) + daft.context.set_runner_native() elif engine == "python": - ctx = daft.context.execution_config_ctx( - enable_native_executor=False, - parquet_target_filesize=target_file_size, - parquet_target_row_group_size=target_row_group_size, - csv_target_filesize=target_file_size, - ) + daft.context.set_runner_py() else: raise ValueError(f"{engine} unsupported") + ctx = daft.context.execution_config_ctx( + parquet_target_filesize=target_file_size, + parquet_target_row_group_size=target_row_group_size, + csv_target_filesize=target_file_size, + ) + with ctx: if file_type == "parquet": return daft_df.write_parquet(tmp_path, partition_cols=partition_cols) diff --git a/tests/conftest.py b/tests/conftest.py index f91dc42797..5bed9eb098 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -175,7 +175,7 @@ def assert_df_equals( @pytest.fixture( scope="function", - params=[1, None] if daft.context.get_context().daft_execution_config.enable_native_executor else [None], + params=[1, None] if daft.context.get_context().runner_config.name == "native" else [None], ) def with_morsel_size(request): morsel_size = request.param diff --git a/tests/cookbook/conftest.py b/tests/cookbook/conftest.py index f62b74203e..2c328e161d 100644 --- a/tests/cookbook/conftest.py +++ b/tests/cookbook/conftest.py @@ -44,7 +44,7 @@ def service_requests_csv_pd_df(): @pytest.fixture( scope="module", - params=[1, 2] if daft.context.get_context().daft_execution_config.enable_native_executor is False else [1], + params=[1, 2] if daft.context.get_context().runner_config.name != "native" else [1], ) def repartition_nparts(request): """Adds a `n_repartitions` parameter to test cases which provides the number of diff --git a/tests/cookbook/test_joins.py b/tests/cookbook/test_joins.py index 856f33e653..0d6fa9f940 100644 --- a/tests/cookbook/test_joins.py +++ b/tests/cookbook/test_joins.py @@ -8,7 +8,7 @@ def skip_invalid_join_strategies(join_strategy): - if context.get_context().daft_execution_config.enable_native_executor is True: + if context.get_context().runner_config.name == "native": if join_strategy not in [None, "hash"]: pytest.skip("Native executor fails for these tests") diff --git a/tests/dataframe/test_joins.py b/tests/dataframe/test_joins.py index ceac56283f..69455cadb3 100644 --- a/tests/dataframe/test_joins.py +++ b/tests/dataframe/test_joins.py @@ -11,7 +11,7 @@ def skip_invalid_join_strategies(join_strategy, join_type): - if context.get_context().daft_execution_config.enable_native_executor is True: + if context.get_context().runner_config.name == "native": if join_strategy not in [None, "hash"]: pytest.skip("Native executor fails for these tests") else: diff --git a/tests/dataframe/test_monotonically_increasing_id.py b/tests/dataframe/test_monotonically_increasing_id.py index fa57e89216..3174f1a465 100644 --- a/tests/dataframe/test_monotonically_increasing_id.py +++ b/tests/dataframe/test_monotonically_increasing_id.py @@ -6,7 +6,7 @@ from daft.datatype import DataType pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, + context.get_context().runner_config.name == "native", reason="Native executor fails for these tests", ) diff --git a/tests/integration/io/test_s3_reads_include_path.py b/tests/integration/io/test_s3_reads_include_path.py index c1a54d24a5..0386d9aeb0 100644 --- a/tests/integration/io/test_s3_reads_include_path.py +++ b/tests/integration/io/test_s3_reads_include_path.py @@ -14,10 +14,10 @@ def test_read_parquet_from_s3_with_include_file_path_column(minio_io_config): ) assert len(file_paths) == 1 file_path = f"s3://{file_paths[0]}" - read_back = daft.read_parquet(file_path, io_config=minio_io_config, file_path_column="path") - assert read_back.to_pydict()["a"] == data["a"] - assert read_back.to_pydict()["b"] == data["b"] - assert read_back.to_pydict()["path"] == [file_path] * 3 + read_back = daft.read_parquet(file_path, io_config=minio_io_config, file_path_column="path").to_pydict() + assert read_back["a"] == data["a"] + assert read_back["b"] == data["b"] + assert read_back["path"] == [file_path] * 3 @pytest.mark.integration() @@ -27,16 +27,15 @@ def test_read_multi_parquet_from_s3_with_include_file_path_column(minio_io_confi with minio_create_bucket(minio_io_config, bucket_name=bucket_name): file_paths = ( daft.from_pydict(data) - .into_partitions(3) - .write_parquet(f"s3://{bucket_name}", io_config=minio_io_config) + .write_parquet(f"s3://{bucket_name}", partition_cols=["a"], io_config=minio_io_config) .to_pydict()["path"] ) assert len(file_paths) == 3 - file_paths = [f"s3://{path}" for path in file_paths] - read_back = daft.read_parquet(file_paths, io_config=minio_io_config, file_path_column="path") - assert read_back.to_pydict()["a"] == data["a"] - assert read_back.to_pydict()["b"] == data["b"] - assert read_back.to_pydict()["path"] == file_paths + file_paths = sorted([f"s3://{path}" for path in file_paths]) + read_back = daft.read_parquet(file_paths, io_config=minio_io_config, file_path_column="path").to_pydict() + assert read_back["a"] == data["a"] + assert read_back["b"] == data["b"] + assert read_back["path"] == file_paths @pytest.mark.integration() @@ -49,10 +48,10 @@ def test_read_csv_from_s3_with_include_file_path_column(minio_io_config): ) assert len(file_paths) == 1 file_path = f"s3://{file_paths[0]}" - read_back = daft.read_csv(file_path, io_config=minio_io_config, file_path_column="path") - assert read_back.to_pydict()["a"] == data["a"] - assert read_back.to_pydict()["b"] == data["b"] - assert read_back.to_pydict()["path"] == [file_path] * 3 + read_back = daft.read_csv(file_path, io_config=minio_io_config, file_path_column="path").to_pydict() + assert read_back["a"] == data["a"] + assert read_back["b"] == data["b"] + assert read_back["path"] == [file_path] * 3 @pytest.mark.integration() @@ -62,13 +61,12 @@ def test_read_multi_csv_from_s3_with_include_file_path_column(minio_io_config): with minio_create_bucket(minio_io_config, bucket_name=bucket_name): file_paths = ( daft.from_pydict(data) - .into_partitions(3) - .write_csv(f"s3://{bucket_name}", io_config=minio_io_config) + .write_csv(f"s3://{bucket_name}", io_config=minio_io_config, partition_cols=["a"]) .to_pydict()["path"] ) assert len(file_paths) == 3 - file_paths = [f"s3://{path}" for path in file_paths] - read_back = daft.read_csv(file_paths, io_config=minio_io_config, file_path_column="path") - assert read_back.to_pydict()["a"] == data["a"] - assert read_back.to_pydict()["b"] == data["b"] - assert read_back.to_pydict()["path"] == file_paths + file_paths = sorted([f"s3://{path}" for path in file_paths]) + read_back = daft.read_csv(file_paths, io_config=minio_io_config, file_path_column="path").to_pydict() + assert read_back["a"] == data["a"] + assert read_back["b"] == data["b"] + assert read_back["path"] == file_paths diff --git a/tests/io/delta_lake/test_table_write.py b/tests/io/delta_lake/test_table_write.py index 5d8814b016..5b7818fb8a 100644 --- a/tests/io/delta_lake/test_table_write.py +++ b/tests/io/delta_lake/test_table_write.py @@ -111,7 +111,7 @@ def test_deltalake_write_overwrite_cloud(cloud_paths): @pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, + context.get_context().runner_config.name == "native", reason="Native executor does not support repartitioning", ) def test_deltalake_write_overwrite_multi_partition(tmp_path): @@ -184,7 +184,7 @@ def test_deltalake_write_ignore(tmp_path): @pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, + context.get_context().runner_config.name == "native", reason="Native executor does not support repartitioning", ) def test_deltalake_write_with_empty_partition(tmp_path, base_table): diff --git a/tests/io/iceberg/test_iceberg_writes.py b/tests/io/iceberg/test_iceberg_writes.py index 61944e3ed6..6c9da4d874 100644 --- a/tests/io/iceberg/test_iceberg_writes.py +++ b/tests/io/iceberg/test_iceberg_writes.py @@ -204,8 +204,8 @@ def test_read_after_write_nested_fields(local_catalog): @pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor does not into_partitions", + context.get_context().runner_config.name == "native", + reason="Native executor does not support into_partitions", ) def test_read_after_write_with_empty_partition(local_catalog): df = daft.from_pydict({"x": [1, 2, 3]}).into_partitions(4) diff --git a/tests/io/lancedb/test_lancedb_writes.py b/tests/io/lancedb/test_lancedb_writes.py index bc4036e563..2622fa02ca 100644 --- a/tests/io/lancedb/test_lancedb_writes.py +++ b/tests/io/lancedb/test_lancedb_writes.py @@ -7,7 +7,7 @@ from daft import context native_executor_skip = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, + context.get_context().runner_config.name == "native", reason="Native executor fails for these tests", ) diff --git a/tests/io/test_write_modes.py b/tests/io/test_write_modes.py index dde38f89a2..187f8b2cf8 100644 --- a/tests/io/test_write_modes.py +++ b/tests/io/test_write_modes.py @@ -5,12 +5,6 @@ import s3fs import daft -from daft import context - -pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor doesn't support writes yet", -) def write( diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index 3cab797498..5f76a8ef24 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -14,8 +14,7 @@ from daft.internal.gpu import cuda_visible_devices pytestmark = pytest.mark.skipif( - context.get_context().daft_execution_config.enable_native_executor is True, - reason="Native executor fails for these tests", + context.get_context().runner_config.name == "native", reason="Native runner does not support resource requests" )