Skip to content

Commit

Permalink
[FEAT] Native Runner (#3178)
Browse files Browse the repository at this point in the history
Makes swordfish a top level runner, `set_runner_native()`. Additionally
sets swordfish to be the default runner for development.

This PR also contains some bug fixes and test changes, of which I have
left comments for.

Additionally, this PR refactors swordfish in two ways:
1. Buffers scan tasks based on a `num_parallel_tasks` parameter, that
takes into account any pushed down limits.
2. Adds an `is_err` check on the sender in parts of the code where we
have a `while receiver.recv.await -> sender.send` pattern, such that it
breaks out of the loop if the sender is dropped. This is needed in cases
when the consumer is done receiving data, such as in a Limit, or if the
user is doing `iter(df)` and breaks out of the iter, which will cause
receivers to be dropped. As such, the senders should recognize this and
drop as well.

---------

Co-authored-by: Colin Ho <[email protected]>
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
3 people authored Nov 7, 2024
1 parent 2b71ffb commit 6e28b3f
Show file tree
Hide file tree
Showing 38 changed files with 425 additions and 228 deletions.
34 changes: 14 additions & 20 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.8', '3.10']
daft-runner: [py, ray]
daft-runner: [py, ray, native]
pyarrow-version: [7.0.0, 16.0.0]
enable-native-executor: [0, 1]
os: [ubuntu-20.04, windows-latest]
exclude:
- daft-runner: ray
enable-native-executor: 1
- daft-runner: ray
pyarrow-version: 7.0.0
os: ubuntu-20.04
- daft-runner: py
python-version: '3.10'
pyarrow-version: 7.0.0
os: ubuntu-20.04
- daft-runner: native
python-version: '3.10'
pyarrow-version: 7.0.0
os: ubuntu-20.04
- python-version: '3.8'
pyarrow-version: 16.0.0
- os: windows-latest
Expand Down Expand Up @@ -118,7 +119,6 @@ jobs:
CARGO_TARGET_DIR: ./target

DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}

- name: Build library and Test with pytest (Windows)
if: ${{ (runner.os == 'Windows') }}
Expand Down Expand Up @@ -220,7 +220,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.8']
daft-runner: [py, ray]
daft-runner: [py, ray, native]
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -295,7 +295,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
daft-runner: [py, ray, native]
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -373,7 +373,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
daft-runner: [py, ray, native]
# These permissions are needed to interact with GitHub's OIDC Token endpoint.
# This is used in the step "Assume GitHub Actions AWS Credentials"
permissions:
Expand Down Expand Up @@ -467,11 +467,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
daft-runner: [py, ray, native]
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -517,7 +513,6 @@ jobs:
pytest tests/integration/iceberg -m 'integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -549,11 +544,7 @@ jobs:
fail-fast: false
matrix:
python-version: ['3.8'] # can't use 3.7 due to requiring anon mode for adlfs
daft-runner: [py, ray]
enable-native-executor: [0, 1]
exclude:
- daft-runner: ray
enable-native-executor: 1
daft-runner: [py, ray, native]
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -598,7 +589,6 @@ jobs:
pytest tests/integration/sql -m 'integration or not integration' --durations=50
env:
DAFT_RUNNER: ${{ matrix.daft-runner }}
DAFT_ENABLE_NATIVE_EXECUTOR: ${{ matrix.enable-native-executor }}
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -681,6 +671,8 @@ jobs:
uses: CodSpeedHQ/action@v3
with:
run: pytest tests/benchmarks -m benchmark --codspeed
env:
DAFT_RUNNER: native
- name: Send Slack notification on failure
uses: slackapi/[email protected]
if: ${{ failure() && (github.ref == 'refs/heads/main') }}
Expand Down Expand Up @@ -810,6 +802,8 @@ jobs:
source activate
maturin develop
pytest --doctest-modules --continue-on-collection-errors daft/dataframe/dataframe.py daft/expressions/expressions.py daft/convert.py daft/udf.py
env:
DAFT_RUNNER: py

publish-coverage-reports:
name: Publish coverage reports to CodeCov
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 52 additions & 10 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import warnings
from typing import TYPE_CHECKING, ClassVar

from daft import get_build_type
from daft.daft import IOConfig, PyDaftExecutionConfig, PyDaftPlanningConfig

if TYPE_CHECKING:
Expand All @@ -27,6 +28,11 @@ class _PyRunnerConfig(_RunnerConfig):
use_thread_pool: bool | None


@dataclasses.dataclass(frozen=True)
class _NativeRunnerConfig(_RunnerConfig):
name = "native"


@dataclasses.dataclass(frozen=True)
class _RayRunnerConfig(_RunnerConfig):
name = "ray"
Expand All @@ -42,6 +48,7 @@ def _get_runner_config_from_env() -> _RunnerConfig:
1. PyRunner: set DAFT_RUNNER=py
2. RayRunner: set DAFT_RUNNER=ray and optionally RAY_ADDRESS=ray://...
3. NativeRunner: set DAFT_RUNNER=native
"""
runner_from_envvar = os.getenv("DAFT_RUNNER")

Expand Down Expand Up @@ -90,6 +97,8 @@ def _get_runner_config_from_env() -> _RunnerConfig:
)
elif runner_from_envvar and runner_from_envvar.upper() == "PY":
return _PyRunnerConfig(use_thread_pool=use_thread_pool)
elif runner_from_envvar and runner_from_envvar.upper() == "NATIVE":
return _NativeRunnerConfig()
elif runner_from_envvar is not None:
raise ValueError(f"Unsupported DAFT_RUNNER variable: {runner_from_envvar}")

Expand All @@ -101,6 +110,10 @@ def _get_runner_config_from_env() -> _RunnerConfig:
force_client_mode=ray_force_client_mode,
)

# Use native runner if in dev mode
elif get_build_type() == "dev":
return _NativeRunnerConfig()

# Fall back on PyRunner
else:
return _PyRunnerConfig(use_thread_pool=use_thread_pool)
Expand All @@ -118,7 +131,6 @@ class DaftContext:
_daft_planning_config: PyDaftPlanningConfig = PyDaftPlanningConfig.from_env()

_runner_config: _RunnerConfig | None = None
_disallow_set_runner: bool = False
_runner: Runner | None = None

_instance: ClassVar[DaftContext | None] = None
Expand Down Expand Up @@ -178,14 +190,15 @@ def _get_runner(self) -> Runner:

assert isinstance(runner_config, _PyRunnerConfig)
self._runner = PyRunner(use_thread_pool=runner_config.use_thread_pool)
elif runner_config.name == "native":
from daft.runners.native_runner import NativeRunner

assert isinstance(runner_config, _NativeRunnerConfig)
self._runner = NativeRunner()

else:
raise NotImplementedError(f"Runner config not implemented: {runner_config.name}")

# Mark DaftContext as having the runner set, which prevents any subsequent setting of the config
# after the runner has been initialized once
self._disallow_set_runner = True

return self._runner

@property
Expand All @@ -194,6 +207,17 @@ def is_ray_runner(self) -> bool:
runner_config = self._get_runner_config()
return isinstance(runner_config, _RayRunnerConfig)

def can_set_runner(self, new_runner_name: str) -> bool:
# If the runner has not been set yet, we can set it
if self._runner_config is None:
return True
# If the runner has been set to the ray runner, we can't set it again
elif self._runner_config.name == "ray":
return False
# If the runner has been set to a local runner, we can set it to a new local runner
else:
return new_runner_name in {"py", "native"}


_DaftContext = DaftContext()

Expand Down Expand Up @@ -229,7 +253,7 @@ def set_runner_ray(

ctx = get_context()
with ctx._lock:
if ctx._disallow_set_runner:
if not ctx.can_set_runner("ray"):
if noop_if_initialized:
warnings.warn(
"Calling daft.context.set_runner_ray(noop_if_initialized=True) multiple times has no effect beyond the first call."
Expand All @@ -242,7 +266,7 @@ def set_runner_ray(
max_task_backlog=max_task_backlog,
force_client_mode=force_client_mode,
)
ctx._disallow_set_runner = True
ctx._runner = None
return ctx


Expand All @@ -256,11 +280,29 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:
"""
ctx = get_context()
with ctx._lock:
if ctx._disallow_set_runner:
if not ctx.can_set_runner("py"):
raise RuntimeError("Cannot set runner more than once")

ctx._runner_config = _PyRunnerConfig(use_thread_pool=use_thread_pool)
ctx._disallow_set_runner = True
ctx._runner = None
return ctx


def set_runner_native() -> DaftContext:
"""Set the runner for executing Daft dataframes to the native runner.
Alternatively, users can set this behavior via an environment variable: DAFT_RUNNER=native
Returns:
DaftContext: Daft context after setting the native runner
"""
ctx = get_context()
with ctx._lock:
if not ctx.can_set_runner("native"):
raise RuntimeError("Cannot set runner more than once")

ctx._runner_config = _NativeRunnerConfig()
ctx._runner = None
return ctx


Expand Down Expand Up @@ -365,7 +407,7 @@ def set_execution_config(
shuffle_aggregation_default_partitions: Maximum number of partitions to create when performing aggregations. Defaults to 200, unless the number of input partitions is less than 200.
read_sql_partition_size_bytes: Target size of partition when reading from SQL databases. Defaults to 512MB
enable_aqe: Enables Adaptive Query Execution, Defaults to False
enable_native_executor: Enables new local executor. Defaults to False
enable_native_executor: Enables the native executor, Defaults to False
default_morsel_size: Default size of morsels used for the new local executor. Defaults to 131072 rows.
"""
# Replace values in the DaftExecutionConfig with user-specified overrides
Expand Down
3 changes: 2 additions & 1 deletion daft/io/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ def __init__(
self.is_closed = False

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
output_file = self.fs.open_output_stream(self.full_path)
return pacsv.CSVWriter(
self.full_path,
output_file,
schema,
)

Expand Down
88 changes: 88 additions & 0 deletions daft/runners/native_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Iterator

from daft.context import get_context
from daft.daft import FileFormatConfig, FileInfos, IOConfig
from daft.execution.native_executor import NativeExecutor
from daft.filesystem import glob_path_with_stats
from daft.runners import runner_io
from daft.runners.partitioning import (
LocalMaterializedResult,
LocalPartitionSet,
PartitionCacheEntry,
PartitionSetCache,
)
from daft.runners.runner import LOCAL_PARTITION_SET_CACHE, Runner
from daft.table import MicroPartition

if TYPE_CHECKING:
from daft.logical.builder import LogicalPlanBuilder

logger = logging.getLogger(__name__)


class NativeRunnerIO(runner_io.RunnerIO):
def glob_paths_details(
self,
source_paths: list[str],
file_format_config: FileFormatConfig | None = None,
io_config: IOConfig | None = None,
) -> FileInfos:
file_infos = FileInfos()
file_format = file_format_config.file_format() if file_format_config is not None else None
for source_path in source_paths:
path_file_infos = glob_path_with_stats(source_path, file_format, io_config)

if len(path_file_infos) == 0:
raise FileNotFoundError(f"No files found at {source_path}")

file_infos.extend(path_file_infos)

return file_infos


class NativeRunner(Runner[MicroPartition]):
def __init__(self) -> None:
super().__init__()

def initialize_partition_set_cache(self) -> PartitionSetCache:
return LOCAL_PARTITION_SET_CACHE

def runner_io(self) -> NativeRunnerIO:
return NativeRunnerIO()

def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry:
results = list(self.run_iter(builder))

result_pset = LocalPartitionSet()
for i, result in enumerate(results):
result_pset.set_partition(i, result)

pset_entry = self.put_partition_set_into_cache(result_pset)
return pset_entry

def run_iter(
self,
builder: LogicalPlanBuilder,
results_buffer_size: int | None = None,
) -> Iterator[LocalMaterializedResult]:
# NOTE: Freeze and use this same execution config for the entire execution
daft_execution_config = get_context().daft_execution_config

# Optimize the logical plan.
builder = builder.optimize()
executor = NativeExecutor.from_logical_plan_builder(builder)
results_gen = executor.run(
{k: v.values() for k, v in self._part_set_cache.get_all_partition_sets().items()},
daft_execution_config,
results_buffer_size,
)
yield from results_gen

def run_iter_tables(
self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None
) -> Iterator[MicroPartition]:
for result in self.run_iter(builder, results_buffer_size=results_buffer_size):
yield result.partition()
Loading

0 comments on commit 6e28b3f

Please sign in to comment.