diff --git a/Cargo.lock b/Cargo.lock index b9ae87f30a..df8ace4ebd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -900,6 +900,14 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "common-daft-config" +version = "0.1.10" +dependencies = [ + "lazy_static", + "pyo3", +] + [[package]] name = "common-error" version = "0.1.10" @@ -1090,6 +1098,7 @@ dependencies = [ name = "daft" version = "0.1.10" dependencies = [ + "common-daft-config", "daft-core", "daft-csv", "daft-dsl", @@ -1301,6 +1310,7 @@ version = "0.1.10" dependencies = [ "arrow2", "bincode", + "common-daft-config", "common-error", "common-io-config", "daft-core", diff --git a/Cargo.toml b/Cargo.toml index 7fdf514a0e..6c3b7ec674 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [dependencies] +common-daft-config = {path = "src/common/daft-config", default-features = false} daft-core = {path = "src/daft-core", default-features = false} daft-csv = {path = "src/daft-csv", default-features = false} daft-dsl = {path = "src/daft-dsl", default-features = false} @@ -26,7 +27,8 @@ python = [ "daft-csv/python", "daft-micropartition/python", "daft-scan/python", - "daft-stats/python" + "daft-stats/python", + "common-daft-config/python" ] [lib] @@ -69,6 +71,7 @@ members = [ "src/common/error", "src/common/io-config", "src/common/treenode", + "src/common/daft-config", "src/daft-core", "src/daft-io", "src/daft-parquet", diff --git a/daft/context.py b/daft/context.py index ee7e367fde..2377521db3 100644 --- a/daft/context.py +++ b/daft/context.py @@ -6,6 +6,8 @@ import warnings from typing import TYPE_CHECKING, ClassVar +from daft.daft import PyDaftConfig + if TYPE_CHECKING: from daft.runners.runner import Runner @@ -59,6 +61,7 @@ def _get_runner_config_from_env() -> _RunnerConfig: class DaftContext: """Global context for the current Daft execution environment""" + daft_config: PyDaftConfig = PyDaftConfig() runner_config: _RunnerConfig = dataclasses.field(default_factory=_get_runner_config_from_env) disallow_set_runner: bool = False @@ -185,3 +188,34 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext: ) _set_context(new_ctx) return new_ctx + + +def set_config( + merge_scan_tasks_min_size_bytes: int | None = None, + merge_scan_tasks_max_size_bytes: int | None = None, +) -> DaftContext: + """Globally sets various configuration parameters which control various aspects of Daft execution + + Args: + merge_scan_tasks_min_size_bytes: Minimum size in bytes when merging ScanTasks when reading files from storage. + Increasing this value will make Daft perform more merging of files into a single partition before yielding, + which leads to bigger but fewer partitions. (Defaults to 64MB) + merge_scan_tasks_max_size_bytes: Maximum size in bytes when merging ScanTasks when reading files from storage. + Increasing this value will increase the upper bound of the size of merged ScanTasks, which leads to bigger but + fewer partitions. (Defaults to 512MB) + """ + old_ctx = get_context() + + # Replace values in the DaftConfig with user-specified overrides + old_daft_config = old_ctx.daft_config + new_daft_config = old_daft_config.with_config_values( + merge_scan_tasks_min_size_bytes=merge_scan_tasks_min_size_bytes, + merge_scan_tasks_max_size_bytes=merge_scan_tasks_max_size_bytes, + ) + + new_ctx = dataclasses.replace( + old_ctx, + daft_config=new_daft_config, + ) + _set_context(new_ctx) + return new_ctx diff --git a/daft/daft.pyi b/daft/daft.pyi index 7215e024fd..0ff42dda9f 100644 --- a/daft/daft.pyi +++ b/daft/daft.pyi @@ -990,9 +990,20 @@ class LogicalPlanBuilder: ) -> LogicalPlanBuilder: ... def schema(self) -> PySchema: ... def optimize(self) -> LogicalPlanBuilder: ... - def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler: ... + def to_physical_plan_scheduler(self, cfg: PyDaftConfig) -> PhysicalPlanScheduler: ... def repr_ascii(self, simple: bool) -> str: ... +class PyDaftConfig: + def with_config_values( + self, + merge_scan_tasks_min_size_bytes: int | None = None, + merge_scan_tasks_max_size_bytes: int | None = None, + ) -> PyDaftConfig: ... + @property + def merge_scan_tasks_min_size_bytes(self): ... + @property + def merge_scan_tasks_max_size_bytes(self): ... + def build_type() -> str: ... def version() -> str: ... def __getattr__(name) -> Any: ... diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 91ddbe17b8..ebcd8ddcc2 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -125,7 +125,8 @@ def explain(self, show_optimized: bool = False, simple=False) -> None: print(builder.pretty_print(simple)) def num_partitions(self) -> int: - return self.__builder.to_physical_plan_scheduler().num_partitions() + daft_config = get_context().daft_config + return self.__builder.to_physical_plan_scheduler(daft_config).num_partitions() @DataframePublicAPI def schema(self) -> Schema: diff --git a/daft/logical/builder.py b/daft/logical/builder.py index cc30c372f6..d6da91ff78 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -14,6 +14,7 @@ from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder from daft.daft import ( PartitionScheme, + PyDaftConfig, ResourceRequest, ScanOperatorHandle, StorageConfig, @@ -34,7 +35,7 @@ class LogicalPlanBuilder: def __init__(self, builder: _LogicalPlanBuilder) -> None: self._builder = builder - def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler: + def to_physical_plan_scheduler(self, daft_config: PyDaftConfig) -> PhysicalPlanScheduler: """ Convert the underlying logical plan to a physical plan scheduler, which is used to generate executable tasks for the physical plan. @@ -43,7 +44,7 @@ def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler: """ from daft.plan_scheduler.physical_plan_scheduler import PhysicalPlanScheduler - return PhysicalPlanScheduler(self._builder.to_physical_plan_scheduler()) + return PhysicalPlanScheduler(self._builder.to_physical_plan_scheduler(daft_config)) def schema(self) -> Schema: """ diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 7af9b7d9e0..be4dc16feb 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -8,6 +8,7 @@ import psutil +from daft.context import get_context from daft.daft import ( FileFormatConfig, FileInfos, @@ -131,11 +132,13 @@ def run_iter( # NOTE: PyRunner does not run any async execution, so it ignores `results_buffer_size` which is essentially 0 results_buffer_size: int | None = None, ) -> Iterator[PyMaterializedResult]: + daft_config = get_context().daft_config + # Optimize the logical plan. builder = builder.optimize() # Finalize the logical plan and get a physical plan scheduler for translating the # physical plan to executable tasks. - plan_scheduler = builder.to_physical_plan_scheduler() + plan_scheduler = builder.to_physical_plan_scheduler(daft_config) psets = { key: entry.value.values() for key, entry in self._part_set_cache._uuid_to_partition_set.items() diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index 4e457caf38..14fceae5fc 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -11,6 +11,7 @@ import pyarrow as pa +from daft.context import get_context from daft.logical.builder import LogicalPlanBuilder from daft.plan_scheduler import PhysicalPlanScheduler from daft.runners.progress_bar import ProgressBar @@ -675,11 +676,13 @@ def active_plans(self) -> list[str]: def run_iter( self, builder: LogicalPlanBuilder, results_buffer_size: int | None = None ) -> Iterator[RayMaterializedResult]: + daft_config = get_context().daft_config + # Optimize the logical plan. builder = builder.optimize() # Finalize the logical plan and get a physical plan scheduler for translating the # physical plan to executable tasks. - plan_scheduler = builder.to_physical_plan_scheduler() + plan_scheduler = builder.to_physical_plan_scheduler(daft_config) psets = { key: entry.value.values() diff --git a/src/common/daft-config/Cargo.toml b/src/common/daft-config/Cargo.toml new file mode 100644 index 0000000000..bbd9878d6c --- /dev/null +++ b/src/common/daft-config/Cargo.toml @@ -0,0 +1,12 @@ +[dependencies] +lazy_static = {workspace = true} +pyo3 = {workspace = true, optional = true} + +[features] +default = ["python"] +python = ["dep:pyo3"] + +[package] +edition = {workspace = true} +name = "common-daft-config" +version = {workspace = true} diff --git a/src/common/daft-config/src/lib.rs b/src/common/daft-config/src/lib.rs new file mode 100644 index 0000000000..77a199d867 --- /dev/null +++ b/src/common/daft-config/src/lib.rs @@ -0,0 +1,30 @@ +#[derive(Clone)] +pub struct DaftConfig { + pub merge_scan_tasks_min_size_bytes: usize, + pub merge_scan_tasks_max_size_bytes: usize, +} + +impl Default for DaftConfig { + fn default() -> Self { + DaftConfig { + merge_scan_tasks_min_size_bytes: 64 * 1024 * 1024, // 64MB + merge_scan_tasks_max_size_bytes: 512 * 1024 * 1024, // 512MB + } + } +} + +#[cfg(feature = "python")] +mod python; + +#[cfg(feature = "python")] +pub use python::PyDaftConfig; + +#[cfg(feature = "python")] +use pyo3::prelude::*; + +#[cfg(feature = "python")] +pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { + parent.add_class::()?; + + Ok(()) +} diff --git a/src/common/daft-config/src/python.rs b/src/common/daft-config/src/python.rs new file mode 100644 index 0000000000..3dd76aad98 --- /dev/null +++ b/src/common/daft-config/src/python.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use pyo3::prelude::*; + +use crate::DaftConfig; + +#[derive(Clone, Default)] +#[pyclass] +pub struct PyDaftConfig { + pub config: Arc, +} + +#[pymethods] +impl PyDaftConfig { + #[new] + pub fn new() -> Self { + PyDaftConfig::default() + } + + fn with_config_values( + &mut self, + merge_scan_tasks_min_size_bytes: Option, + merge_scan_tasks_max_size_bytes: Option, + ) -> PyResult { + let mut config = self.config.as_ref().clone(); + + if let Some(merge_scan_tasks_max_size_bytes) = merge_scan_tasks_max_size_bytes { + config.merge_scan_tasks_max_size_bytes = merge_scan_tasks_max_size_bytes; + } + if let Some(merge_scan_tasks_min_size_bytes) = merge_scan_tasks_min_size_bytes { + config.merge_scan_tasks_min_size_bytes = merge_scan_tasks_min_size_bytes; + } + + Ok(PyDaftConfig { + config: Arc::new(config), + }) + } + + #[getter(merge_scan_tasks_min_size_bytes)] + fn get_merge_scan_tasks_min_size_bytes(&self) -> PyResult { + Ok(self.config.merge_scan_tasks_min_size_bytes) + } + + #[getter(merge_scan_tasks_max_size_bytes)] + fn get_merge_scan_tasks_max_size_bytes(&self) -> PyResult { + Ok(self.config.merge_scan_tasks_max_size_bytes) + } +} diff --git a/src/daft-plan/Cargo.toml b/src/daft-plan/Cargo.toml index f9c683ef77..d05a91f73a 100644 --- a/src/daft-plan/Cargo.toml +++ b/src/daft-plan/Cargo.toml @@ -1,6 +1,7 @@ [dependencies] arrow2 = {workspace = true, features = ["chrono-tz", "compute_take", "compute_cast", "compute_aggregate", "compute_if_then_else", "compute_sort", "compute_filter", "compute_temporal", "compute_comparison", "compute_arithmetics", "compute_concatenate", "io_ipc"]} bincode = {workspace = true} +common-daft-config = {path = "../common/daft-config", default-features = false} common-error = {path = "../common/error", default-features = false} common-io-config = {path = "../common/io-config", default-features = false} daft-core = {path = "../daft-core", default-features = false} @@ -20,7 +21,7 @@ rstest = {workspace = true} [features] default = ["python"] -python = ["dep:pyo3", "common-error/python", "common-io-config/python", "daft-core/python", "daft-dsl/python", "daft-table/python"] +python = ["dep:pyo3", "common-error/python", "common-io-config/python", "common-daft-config/python", "daft-core/python", "daft-dsl/python", "daft-table/python"] [package] edition = {workspace = true} diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 80780b1a45..238f80451b 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -26,6 +26,7 @@ use daft_scan::{ #[cfg(feature = "python")] use { crate::{physical_plan::PhysicalPlan, source_info::InMemoryInfo}, + common_daft_config::PyDaftConfig, daft_core::python::schema::PySchema, daft_dsl::python::PyExpr, daft_scan::{file_format::PyFileFormatConfig, python::pylib::ScanOperatorHandle}, @@ -476,10 +477,15 @@ impl PyLogicalPlanBuilder { /// Finalize the logical plan, translate the logical plan to a physical plan, and return /// a physical plan scheduler that's capable of launching the work necessary to compute the output /// of the physical plan. - pub fn to_physical_plan_scheduler(&self, py: Python) -> PyResult { + pub fn to_physical_plan_scheduler( + &self, + py: Python, + cfg: PyDaftConfig, + ) -> PyResult { py.allow_threads(|| { let logical_plan = self.builder.build(); - let physical_plan: Arc = plan(logical_plan.as_ref())?.into(); + let physical_plan: Arc = + plan(logical_plan.as_ref(), cfg.config.clone())?.into(); Ok(physical_plan.into()) }) } diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs index 50000e4eea..093ba9ea8c 100644 --- a/src/daft-plan/src/physical_ops/project.rs +++ b/src/daft-plan/src/physical_ops/project.rs @@ -186,6 +186,7 @@ impl Project { #[cfg(test)] mod tests { + use common_daft_config::DaftConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit, Expr}; @@ -197,6 +198,7 @@ mod tests { /// do not destroy the partition spec. #[test] fn test_partition_spec_preserving() -> DaftResult<()> { + let cfg = DaftConfig::default().into(); let expressions = vec![ (col("a") % lit(2)), // this is now "a" col("b"), @@ -215,7 +217,7 @@ mod tests { .project(expressions, Default::default())? .build(); - let physical_plan = plan(&logical_plan)?; + let physical_plan = plan(&logical_plan, cfg)?; let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("aa"), col("b")])); @@ -240,6 +242,7 @@ mod tests { )] projection: Vec, ) -> DaftResult<()> { + let cfg = DaftConfig::default().into(); let logical_plan = dummy_scan_node(vec![ Field::new("a", DataType::Int64), Field::new("b", DataType::Int64), @@ -253,7 +256,7 @@ mod tests { .project(projection, Default::default())? .build(); - let physical_plan = plan(&logical_plan)?; + let physical_plan = plan(&logical_plan, cfg)?; let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Unknown, 3, None); assert_eq!( @@ -268,6 +271,7 @@ mod tests { /// i.e. ("a", "a" as "b") remains partitioned by "a", not "b" #[test] fn test_partition_spec_prefer_existing_names() -> DaftResult<()> { + let cfg = DaftConfig::default().into(); let expressions = vec![col("a").alias("y"), col("a"), col("a").alias("z"), col("b")]; let logical_plan = dummy_scan_node(vec![ @@ -283,7 +287,7 @@ mod tests { .project(expressions, Default::default())? .build(); - let physical_plan = plan(&logical_plan)?; + let physical_plan = plan(&logical_plan, cfg)?; let expected_pspec = PartitionSpec::new_internal(PartitionScheme::Hash, 3, Some(vec![col("a"), col("b")])); diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index c28df0c262..9e23a1c627 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -2,6 +2,7 @@ use std::cmp::Ordering; use std::sync::Arc; use std::{cmp::max, collections::HashMap}; +use common_daft_config::DaftConfig; use common_error::DaftResult; use daft_core::count_mode::CountMode; use daft_dsl::Expr; @@ -25,7 +26,7 @@ use crate::{FileFormat, PartitionScheme}; use crate::physical_ops::InMemoryScan; /// Translate a logical plan to a physical plan. -pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { +pub fn plan(logical_plan: &LogicalPlan, cfg: Arc) -> DaftResult { match logical_plan { LogicalPlan::Source(Source { output_schema, @@ -81,8 +82,8 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { // Apply transformations on the ScanTasks to optimize let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes( scan_tasks, - 64 * 1024 * 1024, - 512 * 1024 * 1024, + cfg.merge_scan_tasks_min_size_bytes, + cfg.merge_scan_tasks_max_size_bytes, ); let scan_tasks = scan_tasks.collect::>>()?; @@ -113,7 +114,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { resource_request, .. }) => { - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; let partition_spec = input_physical.partition_spec().clone(); Ok(PhysicalPlan::Project(Project::try_new( input_physical.into(), @@ -123,7 +124,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { )?)) } LogicalPlan::Filter(LogicalFilter { input, predicate }) => { - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; Ok(PhysicalPlan::Filter(Filter::new( input_physical.into(), predicate.clone(), @@ -134,7 +135,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { limit, eager, }) => { - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; let num_partitions = input_physical.partition_spec().num_partitions; Ok(PhysicalPlan::Limit(Limit::new( input_physical.into(), @@ -146,7 +147,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { LogicalPlan::Explode(LogicalExplode { input, to_explode, .. }) => { - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; Ok(PhysicalPlan::Explode(Explode::new( input_physical.into(), to_explode.clone(), @@ -157,7 +158,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { sort_by, descending, }) => { - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; let num_partitions = input_physical.partition_spec().num_partitions; Ok(PhysicalPlan::Sort(Sort::new( input_physical.into(), @@ -177,7 +178,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { // are consistent, which is only the case if boundary sampling is deterministic within a query. assert!(!matches!(scheme, PartitionScheme::Range)); - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; let input_partition_spec = input_physical.partition_spec(); let input_num_partitions = input_partition_spec.num_partitions; let num_partitions = num_partitions.unwrap_or(input_num_partitions); @@ -242,7 +243,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { Ok(repartitioned_plan) } LogicalPlan::Distinct(LogicalDistinct { input }) => { - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; let col_exprs = input .schema() .names() @@ -279,7 +280,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { }) => { use daft_dsl::AggExpr::{self, *}; use daft_dsl::Expr::Column; - let input_plan = plan(input)?; + let input_plan = plan(input, cfg)?; let num_input_partitions = input_plan.partition_spec().num_partitions; @@ -469,8 +470,8 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { Ok(result_plan) } LogicalPlan::Concat(LogicalConcat { input, other }) => { - let input_physical = plan(input)?; - let other_physical = plan(other)?; + let input_physical = plan(input, cfg.clone())?; + let other_physical = plan(other, cfg.clone())?; Ok(PhysicalPlan::Concat(Concat::new( input_physical.into(), other_physical.into(), @@ -484,8 +485,8 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { join_type, .. }) => { - let mut left_physical = plan(left)?; - let mut right_physical = plan(right)?; + let mut left_physical = plan(left, cfg.clone())?; + let mut right_physical = plan(right, cfg.clone())?; let left_pspec = left_physical.partition_spec(); let right_pspec = right_physical.partition_spec(); let num_partitions = max(left_pspec.num_partitions, right_pspec.num_partitions); @@ -532,7 +533,7 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { sink_info, input, }) => { - let input_physical = plan(input)?; + let input_physical = plan(input, cfg)?; match sink_info.as_ref() { SinkInfo::OutputFileInfo(file_info @ OutputFileInfo { file_format, .. }) => { match file_format { @@ -564,10 +565,12 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { #[cfg(test)] mod tests { + use common_daft_config::DaftConfig; use common_error::DaftResult; use daft_core::{datatypes::Field, DataType}; use daft_dsl::{col, lit, AggExpr, Expr}; use std::assert_matches::assert_matches; + use std::sync::Arc; use crate::physical_plan::PhysicalPlan; use crate::planner::plan; @@ -578,6 +581,7 @@ mod tests { /// Repartition-upstream_op -> upstream_op #[test] fn repartition_dropped_redundant_into_partitions() -> DaftResult<()> { + let cfg: Arc = DaftConfig::default().into(); // dummy_scan_node() will create the default PartitionSpec, which only has a single partition. let builder = dummy_scan_node(vec![ Field::new("a", DataType::Int64), @@ -586,7 +590,7 @@ mod tests { .repartition(Some(10), vec![], PartitionScheme::Unknown)? .filter(col("a").lt(&lit(2)))?; assert_eq!( - plan(builder.build().as_ref())? + plan(builder.build().as_ref(), cfg.clone())? .partition_spec() .num_partitions, 10 @@ -594,7 +598,7 @@ mod tests { let logical_plan = builder .repartition(Some(10), vec![], PartitionScheme::Unknown)? .build(); - let physical_plan = plan(logical_plan.as_ref())?; + let physical_plan = plan(logical_plan.as_ref(), cfg.clone())?; // Check that the last repartition was dropped (the last op should be the filter). assert_matches!(physical_plan, PhysicalPlan::Filter(_)); Ok(()) @@ -605,13 +609,14 @@ mod tests { /// Repartition-upstream_op -> upstream_op #[test] fn repartition_dropped_single_partition() -> DaftResult<()> { + let cfg: Arc = DaftConfig::default().into(); // dummy_scan_node() will create the default PartitionSpec, which only has a single partition. let builder = dummy_scan_node(vec![ Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), ]); assert_eq!( - plan(builder.build().as_ref())? + plan(builder.build().as_ref(), cfg.clone())? .partition_spec() .num_partitions, 1 @@ -619,7 +624,7 @@ mod tests { let logical_plan = builder .repartition(Some(1), vec![col("a")], PartitionScheme::Hash)? .build(); - let physical_plan = plan(logical_plan.as_ref())?; + let physical_plan = plan(logical_plan.as_ref(), cfg.clone())?; assert_matches!(physical_plan, PhysicalPlan::TabularScanJson(_)); Ok(()) } @@ -629,6 +634,7 @@ mod tests { /// Repartition-upstream_op -> upstream_op #[test] fn repartition_dropped_same_partition_spec() -> DaftResult<()> { + let cfg = DaftConfig::default().into(); let logical_plan = dummy_scan_node(vec![ Field::new("a", DataType::Int64), Field::new("b", DataType::Utf8), @@ -637,7 +643,7 @@ mod tests { .filter(col("a").lt(&lit(2)))? .repartition(Some(10), vec![col("a")], PartitionScheme::Hash)? .build(); - let physical_plan = plan(logical_plan.as_ref())?; + let physical_plan = plan(logical_plan.as_ref(), cfg)?; // Check that the last repartition was dropped (the last op should be the filter). assert_matches!(physical_plan, PhysicalPlan::Filter(_)); Ok(()) @@ -648,6 +654,7 @@ mod tests { /// Repartition-Aggregation -> Aggregation #[test] fn repartition_dropped_same_partition_spec_agg() -> DaftResult<()> { + let cfg = DaftConfig::default().into(); let logical_plan = dummy_scan_node(vec![ Field::new("a", DataType::Int64), Field::new("b", DataType::Int64), @@ -659,7 +666,7 @@ mod tests { )? .repartition(Some(10), vec![col("b")], PartitionScheme::Hash)? .build(); - let physical_plan = plan(logical_plan.as_ref())?; + let physical_plan = plan(logical_plan.as_ref(), cfg)?; // Check that the last repartition was dropped (the last op should be a projection for a multi-partition aggregation). assert_matches!(physical_plan, PhysicalPlan::Project(_)); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index dc7b837772..fce2431e7c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,7 @@ pub mod pylib { daft_plan::register_modules(_py, m)?; daft_micropartition::register_modules(_py, m)?; daft_scan::register_modules(_py, m)?; + common_daft_config::register_modules(_py, m)?; m.add_wrapped(wrap_pyfunction!(version))?; m.add_wrapped(wrap_pyfunction!(build_type))?; diff --git a/tests/conftest.py b/tests/conftest.py index dfb396c053..805b33e288 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,6 +10,16 @@ from daft.table import MicroPartition +@pytest.fixture(scope="session", autouse=True) +def set_configs(): + """Sets global Daft config for testing""" + daft.context.set_config( + # Disables merging of ScanTasks + merge_scan_tasks_min_size_bytes=0, + merge_scan_tasks_max_size_bytes=0, + ) + + def pytest_configure(config): config.addinivalue_line( "markers", "integration: mark test as an integration test that runs with external dependencies" diff --git a/tests/io/test_merge_scan_tasks.py b/tests/io/test_merge_scan_tasks.py new file mode 100644 index 0000000000..8ff6b47ad0 --- /dev/null +++ b/tests/io/test_merge_scan_tasks.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import contextlib + +import pytest + +import daft + + +@contextlib.contextmanager +def override_merge_scan_tasks_configs(merge_scan_tasks_min_size_bytes: int, merge_scan_tasks_max_size_bytes: int): + config = daft.context.get_context().daft_config + original_merge_scan_tasks_min_size_bytes = config.merge_scan_tasks_min_size_bytes + original_merge_scan_tasks_max_size_bytes = config.merge_scan_tasks_max_size_bytes + + try: + daft.context.set_config( + merge_scan_tasks_min_size_bytes=merge_scan_tasks_min_size_bytes, + merge_scan_tasks_max_size_bytes=merge_scan_tasks_max_size_bytes, + ) + yield + finally: + daft.context.set_config( + merge_scan_tasks_min_size_bytes=original_merge_scan_tasks_min_size_bytes, + merge_scan_tasks_max_size_bytes=original_merge_scan_tasks_max_size_bytes, + ) + + +@pytest.fixture(scope="function") +def csv_files(tmpdir): + """Writes 3 CSV files, each of 10 bytes in size, to tmpdir and yield tmpdir""" + + for i in range(3): + path = tmpdir / f"file.{i}.csv" + path.write_text("a,b,c\n1,2,", "utf8") # 10 bytes + + return tmpdir + + +def test_merge_scan_task_exceed_max(csv_files): + with override_merge_scan_tasks_configs(0, 0): + df = daft.read_csv(str(csv_files)) + assert ( + df.num_partitions() == 3 + ), "Should have 3 partitions since all merges are more than the maximum (>0 bytes)" + + +def test_merge_scan_task_below_max(csv_files): + with override_merge_scan_tasks_configs(1, 20): + df = daft.read_csv(str(csv_files)) + assert ( + df.num_partitions() == 2 + ), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the second merge is too large (>20 bytes)" + + +def test_merge_scan_task_above_min(csv_files): + with override_merge_scan_tasks_configs(0, 40): + df = daft.read_csv(str(csv_files)) + assert ( + df.num_partitions() == 2 + ), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the first merge is above the minimum (>0 bytes)" + + +def test_merge_scan_task_below_min(csv_files): + with override_merge_scan_tasks_configs(35, 40): + df = daft.read_csv(str(csv_files)) + assert ( + df.num_partitions() == 1 + ), "Should have 1 partition [(CSV1, CSV2, CSV3)] since both merges are below the minimum and maximum"