diff --git a/dask/bag/core.py b/dask/bag/core.py index 43e347c73e1..527b48b9a08 100644 --- a/dask/bag/core.py +++ b/dask/bag/core.py @@ -1624,12 +1624,12 @@ def to_dataframe(self, meta=None, columns=None, optimize_graph=True): if not dd._dask_expr_enabled(): return dd.DataFrame(dsk, dfs.name, meta, divisions) else: - from dask_expr import from_dask_dataframe + from dask_expr import from_legacy_dataframe from dask.dataframe.core import DataFrame df = DataFrame(dsk, dfs.name, meta, divisions) - return from_dask_dataframe(df) + return from_legacy_dataframe(df) def to_delayed(self, optimize_graph=True): """Convert into a list of ``dask.delayed`` objects, one per partition. @@ -1743,7 +1743,7 @@ def partition(grouper, sequence, npartitions, p, nelements=2**20): d = groupby(grouper, block) d2 = defaultdict(list) for k, v in d.items(): - d2[abs(hash(k)) % npartitions].extend(v) + d2[abs(int(tokenize(k), 16)) % npartitions].extend(v) p.append(d2, fsync=True) return p @@ -2363,7 +2363,7 @@ def h(x): return h -def groupby_tasks(b, grouper, hash=hash, max_branch=32): +def groupby_tasks(b, grouper, hash=lambda x: int(tokenize(x), 16), max_branch=32): max_branch = max_branch or 32 n = b.npartitions diff --git a/dask/bag/tests/test_bag.py b/dask/bag/tests/test_bag.py index d00addb5dc9..45e3bacafe0 100644 --- a/dask/bag/tests/test_bag.py +++ b/dask/bag/tests/test_bag.py @@ -9,6 +9,7 @@ from bz2 import BZ2File from collections.abc import Iterator from concurrent.futures import ProcessPoolExecutor +from dataclasses import dataclass from gzip import GzipFile from itertools import repeat @@ -65,7 +66,7 @@ def test_keys(): def test_bag_groupby_pure_hash(): # https://github.com/dask/dask/issues/6640 result = b.groupby(iseven).compute() - assert result == [(False, [1, 3] * 3), (True, [0, 2, 4] * 3)] + assert result == [(True, [0, 2, 4] * 3), (False, [1, 3] * 3)] def test_bag_groupby_normal_hash(): @@ -76,6 +77,41 @@ def test_bag_groupby_normal_hash(): assert ("even", [0, 2, 4] * 3) in result +@pytest.mark.parametrize("shuffle", ["disk", "tasks"]) +@pytest.mark.parametrize("scheduler", ["synchronous", "processes"]) +def test_bag_groupby_none(shuffle, scheduler): + with dask.config.set(scheduler=scheduler): + seq = [(None, i) for i in range(50)] + b = db.from_sequence(seq).groupby(lambda x: x[0], shuffle=shuffle) + result = b.compute() + assert len(result) == 1 + + +@dataclass(frozen=True) +class Key: + foo: int + bar: int | None = None + + +@pytest.mark.parametrize( + "key", + # if a value for `bar` is not explicitly passed, Key.bar will default to `None`, + # thereby introducing the risk of inter-process inconsistency for the value returned by + # built-in `hash` (due to lack of deterministic hashing for `None` prior to python 3.12). + # without https://github.com/dask/dask/pull/10734, this results in failures for this test. + [Key(foo=1), Key(foo=1, bar=2)], + ids=["none_field", "no_none_fields"], +) +@pytest.mark.parametrize("shuffle", ["disk", "tasks"]) +@pytest.mark.parametrize("scheduler", ["synchronous", "processes"]) +def test_bag_groupby_dataclass(key, shuffle, scheduler): + seq = [(key, i) for i in range(50)] + b = db.from_sequence(seq).groupby(lambda x: x[0], shuffle=shuffle) + with dask.config.set(scheduler=scheduler): + result = b.compute() + assert len(result) == 1 + + def test_bag_map(): b = db.from_sequence(range(100), npartitions=10) b2 = db.from_sequence(range(100, 200), npartitions=10) @@ -763,11 +799,11 @@ def test_product(): def test_partition_collect(): with partd.Pickle() as p: partition(identity, range(6), 3, p) - assert set(p.get(0)) == {0, 3} - assert set(p.get(1)) == {1, 4} - assert set(p.get(2)) == {2, 5} + assert set(p.get(0)) == {3, 5} + assert set(p.get(1)) == {1} + assert set(p.get(2)) == {0, 2, 4} - assert sorted(collect(identity, 0, p, "")) == [(0, [0]), (3, [3])] + assert sorted(collect(identity, 2, p, "")) == [(0, [0]), (2, [2]), (4, [4])] def test_groupby(): diff --git a/dask/dataframe/__init__.py b/dask/dataframe/__init__.py index 815a26c001c..08949b829b8 100644 --- a/dask/dataframe/__init__.py +++ b/dask/dataframe/__init__.py @@ -123,6 +123,7 @@ def _dask_expr_enabled() -> bool: from_delayed, from_dict, from_graph, + from_legacy_dataframe, from_map, from_pandas, get_dummies, diff --git a/dask/dataframe/accessor.py b/dask/dataframe/accessor.py index 0bb10e98151..54f72cd9a1f 100644 --- a/dask/dataframe/accessor.py +++ b/dask/dataframe/accessor.py @@ -35,7 +35,7 @@ def func(self): elif isinstance(original_prop, functools.cached_property): method = original_prop.func else: - raise TypeError("bind_property expects original class to provide a property") + method = original_prop try: func.__wrapped__ = method except Exception: diff --git a/dask/dataframe/backends.py b/dask/dataframe/backends.py index df5a4035a85..48fd4ea109c 100644 --- a/dask/dataframe/backends.py +++ b/dask/dataframe/backends.py @@ -771,13 +771,15 @@ def to_backend(cls, data: _Frame, **kwargs): @concat_dispatch.register_lazy("cudf") -@hash_object_dispatch.register_lazy("cudf") +@from_pyarrow_table_dispatch.register_lazy("cudf") @group_split_dispatch.register_lazy("cudf") @get_parallel_type.register_lazy("cudf") +@hash_object_dispatch.register_lazy("cudf") @meta_nonempty.register_lazy("cudf") @make_meta_dispatch.register_lazy("cudf") @make_meta_obj.register_lazy("cudf") @percentile_lookup.register_lazy("cudf") +@to_pyarrow_table_dispatch.register_lazy("cudf") @tolist_dispatch.register_lazy("cudf") def _register_cudf(): import dask_cudf # noqa: F401 diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index dcd637132d6..324c0a2698e 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,6 +1,23 @@ Changelog ========= +.. _v2024.4.1: + +2024.4.1 +-------- + +This is a minor bugfix release that that fixes an error when importing +``dask.dataframe`` with Python 3.11.9. + +See :pr:`11035` and :pr:`11039` from `Richard (Rick) Zamora`_ for details. + +.. dropdown:: Additional changes + + - Remove skips for named aggregations (:pr:`11036`) `Patrick Hoefler`_ + - Don't deep-copy read-only buffers on unpickle (:pr-distributed:`8609`) `crusaderky`_ + - Add ``dask-expr`` to ``dask`` conda recipe (:pr-distributed:`8601`) `Charles Blackmon-Luca`_ + + .. _v2024.4.0: 2024.4.0 diff --git a/docs/source/dataframe-api.rst b/docs/source/dataframe-api.rst index f364ae6a8df..bcd1cbbe1c8 100644 --- a/docs/source/dataframe-api.rst +++ b/docs/source/dataframe-api.rst @@ -130,6 +130,7 @@ DataFrame DataFrame.to_hdf DataFrame.to_html DataFrame.to_json + DataFrame.to_legacy_dataframe DataFrame.to_parquet DataFrame.to_records DataFrame.to_string diff --git a/pyproject.toml b/pyproject.toml index 2a00d3826ef..0b3097080c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ dataframe = [ "pandas >= 1.3", "dask-expr >= 1.0, <1.1", # dask-expr pins the dask version ] -distributed = ["distributed == 2024.4.0"] +distributed = ["distributed == 2024.4.1"] diagnostics = [ "bokeh >= 2.4.2", "jinja2 >= 2.10.3",