diff --git a/dask/dataframe/backends.py b/dask/dataframe/backends.py index 9ef5d1b360f..5daaec7819b 100644 --- a/dask/dataframe/backends.py +++ b/dask/dataframe/backends.py @@ -703,8 +703,8 @@ def categorical_dtype_pandas(categories=None, ordered=False): return pd.api.types.CategoricalDtype(categories=categories, ordered=ordered) -@tolist_dispatch.register((pd.Series, pd.Index, pd.Categorical)) -def tolist_pandas(obj): +@tolist_dispatch.register((np.ndarray, pd.Series, pd.Index, pd.Categorical)) +def tolist_numpy_or_pandas(obj): return obj.tolist() @@ -770,11 +770,13 @@ def to_backend(cls, data: _Frame, **kwargs): @make_meta_dispatch.register_lazy("cudf") @make_meta_obj.register_lazy("cudf") @percentile_lookup.register_lazy("cudf") +@tolist_dispatch.register_lazy("cudf") def _register_cudf(): import dask_cudf # noqa: F401 @meta_lib_from_array.register_lazy("cupy") +@tolist_dispatch.register_lazy("cupy") def _register_cupy_to_cudf(): # Handle cupy.ndarray -> cudf.DataFrame dispatching try: @@ -786,5 +788,9 @@ def meta_lib_from_array_cupy(x): # cupy -> cudf return cudf + @tolist_dispatch.register(cupy.ndarray) + def tolist_cupy(x): + return x.tolist() + except ImportError: pass diff --git a/dask/dataframe/partitionquantiles.py b/dask/dataframe/partitionquantiles.py index a0c24300dd2..0318d3d7f7a 100644 --- a/dask/dataframe/partitionquantiles.py +++ b/dask/dataframe/partitionquantiles.py @@ -79,6 +79,7 @@ from dask.base import tokenize from dask.dataframe.core import Series +from dask.dataframe.dispatch import tolist_dispatch from dask.utils import is_cupy_type, random_state_data @@ -262,7 +263,7 @@ def percentiles_to_weights(qs, vals, length): return () diff = np.ediff1d(qs, 0.0, 0.0) weights = 0.5 * length * (diff[1:] + diff[:-1]) - return vals.tolist(), weights.tolist() + return tolist_dispatch(vals), weights.tolist() def merge_and_compress_summaries(vals_and_weights): @@ -408,9 +409,6 @@ def percentiles_summary(df, num_old, num_new, upsample, state): Scale factor to increase the number of percentiles calculated in each partition. Use to improve accuracy. """ - from dask.array.dispatch import percentile_lookup as _percentile - from dask.array.utils import array_safe - length = len(df) if length == 0: return () @@ -425,12 +423,22 @@ def percentiles_summary(df, num_old, num_new, upsample, state): elif is_datetime64_dtype(data.dtype) or is_integer_dtype(data.dtype): interpolation = "nearest" - # FIXME: pandas quantile doesn't work with some data types (e.g. strings). - # We fall back to an ndarray as a workaround. + # FIXME: Series.quantile doesn't work with some data types (e.g. strings). + # We try using DataFrame.quantile(..., method="table") as a workaround. try: vals = data.quantile(q=qs / 100, interpolation=interpolation).values except (TypeError, NotImplementedError): - vals, _ = _percentile(array_safe(data, like=data.values), qs, interpolation) + interpolation = "nearest" + vals = ( + data.to_frame() + .quantile( + q=qs / 100, + interpolation=interpolation, + numeric_only=False, + method="table", + ) + .iloc[:, 0] + ) if ( is_cupy_type(data) diff --git a/dask/dataframe/shuffle.py b/dask/dataframe/shuffle.py index 4698358495b..f0d00c4d3ed 100644 --- a/dask/dataframe/shuffle.py +++ b/dask/dataframe/shuffle.py @@ -120,13 +120,13 @@ def sort_values( df: DataFrame, by: str | list[str], npartitions: int | Literal["auto"] | None = None, + shuffle: str | None = None, ascending: bool | list[bool] = True, na_position: Literal["first"] | Literal["last"] = "last", upsample: float = 1.0, partition_size: float = 128e6, sort_function: Callable[[pd.DataFrame], pd.DataFrame] | None = None, sort_function_kwargs: Mapping[str, Any] | None = None, - **kwargs, ) -> DataFrame: """See DataFrame.sort_values for docstring""" if na_position not in ("first", "last"): @@ -176,7 +176,7 @@ def sort_values( f"Dask currently only supports a single boolean for ascending. You passed {str(ascending)}" ) - divisions, mins, maxes, presorted = _calculate_divisions( + divisions, _, _, presorted = _calculate_divisions( df, sort_by_col, repartition, npartitions, upsample, partition_size, ascending ) @@ -193,6 +193,7 @@ def sort_values( df, by[0], divisions, + shuffle=shuffle, ascending=ascending, na_position=na_position, duplicates=False, diff --git a/dask/dataframe/tests/test_shuffle.py b/dask/dataframe/tests/test_shuffle.py index 80b2bc4311e..9df246f73e6 100644 --- a/dask/dataframe/tests/test_shuffle.py +++ b/dask/dataframe/tests/test_shuffle.py @@ -1482,6 +1482,22 @@ def test_sort_values(nelem, by, ascending): dd.assert_eq(got, expect, check_index=False, sort_results=False) +@pytest.mark.parametrize( + "backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)] +) +@pytest.mark.parametrize("by", ["x", "z", ["x", "z"], ["z", "x"]]) +@pytest.mark.parametrize("ascending", [True, False]) +def test_sort_values_tasks_backend(backend, by, ascending): + pdf = pd.DataFrame( + {"x": range(10), "y": [1, 2, 3, 4, 5] * 2, "z": ["cat", "dog"] * 5} + ) + ddf = dd.from_pandas(pdf, npartitions=10).to_backend(backend) + + expect = pdf.sort_values(by=by, ascending=ascending) + got = dd.DataFrame.sort_values(ddf, by=by, ascending=ascending, shuffle="tasks") + dd.assert_eq(got, expect, sort_results=False) + + @pytest.mark.parametrize("ascending", [True, False, [False, True], [True, False]]) @pytest.mark.parametrize("by", [["a", "b"], ["b", "a"]]) @pytest.mark.parametrize("nelem", [10, 500])