Skip to content

Commit

Permalink
support cudf backend in dd.DataFrame.sort_values
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Oct 9, 2023
1 parent 2b45b21 commit 142579f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
10 changes: 8 additions & 2 deletions dask/dataframe/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,8 @@ def categorical_dtype_pandas(categories=None, ordered=False):
return pd.api.types.CategoricalDtype(categories=categories, ordered=ordered)


@tolist_dispatch.register((pd.Series, pd.Index, pd.Categorical))
def tolist_pandas(obj):
@tolist_dispatch.register((np.ndarray, pd.Series, pd.Index, pd.Categorical))
def tolist_numpy_or_pandas(obj):
return obj.tolist()


Expand Down Expand Up @@ -770,11 +770,13 @@ def to_backend(cls, data: _Frame, **kwargs):
@make_meta_dispatch.register_lazy("cudf")
@make_meta_obj.register_lazy("cudf")
@percentile_lookup.register_lazy("cudf")
@tolist_dispatch.register_lazy("cudf")
def _register_cudf():
import dask_cudf # noqa: F401


@meta_lib_from_array.register_lazy("cupy")
@tolist_dispatch.register_lazy("cupy")
def _register_cupy_to_cudf():
# Handle cupy.ndarray -> cudf.DataFrame dispatching
try:
Expand All @@ -786,5 +788,9 @@ def meta_lib_from_array_cupy(x):
# cupy -> cudf
return cudf

@tolist_dispatch.register(cupy.ndarray)
def tolist_cupy(x):
return x.tolist()

except ImportError:
pass
22 changes: 15 additions & 7 deletions dask/dataframe/partitionquantiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@

from dask.base import tokenize
from dask.dataframe.core import Series
from dask.dataframe.dispatch import tolist_dispatch
from dask.utils import is_cupy_type, random_state_data


Expand Down Expand Up @@ -262,7 +263,7 @@ def percentiles_to_weights(qs, vals, length):
return ()
diff = np.ediff1d(qs, 0.0, 0.0)
weights = 0.5 * length * (diff[1:] + diff[:-1])
return vals.tolist(), weights.tolist()
return tolist_dispatch(vals), weights.tolist()


def merge_and_compress_summaries(vals_and_weights):
Expand Down Expand Up @@ -408,9 +409,6 @@ def percentiles_summary(df, num_old, num_new, upsample, state):
Scale factor to increase the number of percentiles calculated in
each partition. Use to improve accuracy.
"""
from dask.array.dispatch import percentile_lookup as _percentile
from dask.array.utils import array_safe

length = len(df)
if length == 0:
return ()
Expand All @@ -425,12 +423,22 @@ def percentiles_summary(df, num_old, num_new, upsample, state):
elif is_datetime64_dtype(data.dtype) or is_integer_dtype(data.dtype):
interpolation = "nearest"

# FIXME: pandas quantile doesn't work with some data types (e.g. strings).
# We fall back to an ndarray as a workaround.
# FIXME: Series.quantile doesn't work with some data types (e.g. strings).
# We try using DataFrame.quantile(..., method="table") as a workaround.
try:
vals = data.quantile(q=qs / 100, interpolation=interpolation).values
except (TypeError, NotImplementedError):
vals, _ = _percentile(array_safe(data, like=data.values), qs, interpolation)
interpolation = "nearest"
vals = (
data.to_frame()
.quantile(
q=qs / 100,
interpolation=interpolation,
numeric_only=False,
method="table",
)
.iloc[:, 0]
)

if (
is_cupy_type(data)
Expand Down
5 changes: 3 additions & 2 deletions dask/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ def sort_values(
df: DataFrame,
by: str | list[str],
npartitions: int | Literal["auto"] | None = None,
shuffle: str | None = None,
ascending: bool | list[bool] = True,
na_position: Literal["first"] | Literal["last"] = "last",
upsample: float = 1.0,
partition_size: float = 128e6,
sort_function: Callable[[pd.DataFrame], pd.DataFrame] | None = None,
sort_function_kwargs: Mapping[str, Any] | None = None,
**kwargs,
) -> DataFrame:
"""See DataFrame.sort_values for docstring"""
if na_position not in ("first", "last"):
Expand Down Expand Up @@ -176,7 +176,7 @@ def sort_values(
f"Dask currently only supports a single boolean for ascending. You passed {str(ascending)}"
)

divisions, mins, maxes, presorted = _calculate_divisions(
divisions, _, _, presorted = _calculate_divisions(
df, sort_by_col, repartition, npartitions, upsample, partition_size, ascending
)

Expand All @@ -193,6 +193,7 @@ def sort_values(
df,
by[0],
divisions,
shuffle=shuffle,
ascending=ascending,
na_position=na_position,
duplicates=False,
Expand Down
16 changes: 16 additions & 0 deletions dask/dataframe/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,22 @@ def test_sort_values(nelem, by, ascending):
dd.assert_eq(got, expect, check_index=False, sort_results=False)


@pytest.mark.parametrize(
"backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]
)
@pytest.mark.parametrize("by", ["x", "z", ["x", "z"], ["z", "x"]])
@pytest.mark.parametrize("ascending", [True, False])
def test_sort_values_tasks_backend(backend, by, ascending):
pdf = pd.DataFrame(
{"x": range(10), "y": [1, 2, 3, 4, 5] * 2, "z": ["cat", "dog"] * 5}
)
ddf = dd.from_pandas(pdf, npartitions=10).to_backend(backend)

expect = pdf.sort_values(by=by, ascending=ascending)
got = dd.DataFrame.sort_values(ddf, by=by, ascending=ascending, shuffle="tasks")
dd.assert_eq(got, expect, sort_results=False)


@pytest.mark.parametrize("ascending", [True, False, [False, True], [True, False]])
@pytest.mark.parametrize("by", [["a", "b"], ["b", "a"]])
@pytest.mark.parametrize("nelem", [10, 500])
Expand Down

0 comments on commit 142579f

Please sign in to comment.