Skip to content

Commit

Permalink
Merge branch 'main' into fix-test_set_index
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora authored Apr 8, 2024
2 parents 13dccd6 + c7de488 commit d602be2
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 12 deletions.
8 changes: 4 additions & 4 deletions dask/bag/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,12 +1624,12 @@ def to_dataframe(self, meta=None, columns=None, optimize_graph=True):
if not dd._dask_expr_enabled():
return dd.DataFrame(dsk, dfs.name, meta, divisions)
else:
from dask_expr import from_dask_dataframe
from dask_expr import from_legacy_dataframe

from dask.dataframe.core import DataFrame

df = DataFrame(dsk, dfs.name, meta, divisions)
return from_dask_dataframe(df)
return from_legacy_dataframe(df)

def to_delayed(self, optimize_graph=True):
"""Convert into a list of ``dask.delayed`` objects, one per partition.
Expand Down Expand Up @@ -1743,7 +1743,7 @@ def partition(grouper, sequence, npartitions, p, nelements=2**20):
d = groupby(grouper, block)
d2 = defaultdict(list)
for k, v in d.items():
d2[abs(hash(k)) % npartitions].extend(v)
d2[abs(int(tokenize(k), 16)) % npartitions].extend(v)
p.append(d2, fsync=True)
return p

Expand Down Expand Up @@ -2363,7 +2363,7 @@ def h(x):
return h


def groupby_tasks(b, grouper, hash=hash, max_branch=32):
def groupby_tasks(b, grouper, hash=lambda x: int(tokenize(x), 16), max_branch=32):
max_branch = max_branch or 32
n = b.npartitions

Expand Down
46 changes: 41 additions & 5 deletions dask/bag/tests/test_bag.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from bz2 import BZ2File
from collections.abc import Iterator
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass
from gzip import GzipFile
from itertools import repeat

Expand Down Expand Up @@ -65,7 +66,7 @@ def test_keys():
def test_bag_groupby_pure_hash():
# https://github.com/dask/dask/issues/6640
result = b.groupby(iseven).compute()
assert result == [(False, [1, 3] * 3), (True, [0, 2, 4] * 3)]
assert result == [(True, [0, 2, 4] * 3), (False, [1, 3] * 3)]


def test_bag_groupby_normal_hash():
Expand All @@ -76,6 +77,41 @@ def test_bag_groupby_normal_hash():
assert ("even", [0, 2, 4] * 3) in result


@pytest.mark.parametrize("shuffle", ["disk", "tasks"])
@pytest.mark.parametrize("scheduler", ["synchronous", "processes"])
def test_bag_groupby_none(shuffle, scheduler):
with dask.config.set(scheduler=scheduler):
seq = [(None, i) for i in range(50)]
b = db.from_sequence(seq).groupby(lambda x: x[0], shuffle=shuffle)
result = b.compute()
assert len(result) == 1


@dataclass(frozen=True)
class Key:
foo: int
bar: int | None = None


@pytest.mark.parametrize(
"key",
# if a value for `bar` is not explicitly passed, Key.bar will default to `None`,
# thereby introducing the risk of inter-process inconsistency for the value returned by
# built-in `hash` (due to lack of deterministic hashing for `None` prior to python 3.12).
# without https://github.com/dask/dask/pull/10734, this results in failures for this test.
[Key(foo=1), Key(foo=1, bar=2)],
ids=["none_field", "no_none_fields"],
)
@pytest.mark.parametrize("shuffle", ["disk", "tasks"])
@pytest.mark.parametrize("scheduler", ["synchronous", "processes"])
def test_bag_groupby_dataclass(key, shuffle, scheduler):
seq = [(key, i) for i in range(50)]
b = db.from_sequence(seq).groupby(lambda x: x[0], shuffle=shuffle)
with dask.config.set(scheduler=scheduler):
result = b.compute()
assert len(result) == 1


def test_bag_map():
b = db.from_sequence(range(100), npartitions=10)
b2 = db.from_sequence(range(100, 200), npartitions=10)
Expand Down Expand Up @@ -763,11 +799,11 @@ def test_product():
def test_partition_collect():
with partd.Pickle() as p:
partition(identity, range(6), 3, p)
assert set(p.get(0)) == {0, 3}
assert set(p.get(1)) == {1, 4}
assert set(p.get(2)) == {2, 5}
assert set(p.get(0)) == {3, 5}
assert set(p.get(1)) == {1}
assert set(p.get(2)) == {0, 2, 4}

assert sorted(collect(identity, 0, p, "")) == [(0, [0]), (3, [3])]
assert sorted(collect(identity, 2, p, "")) == [(0, [0]), (2, [2]), (4, [4])]


def test_groupby():
Expand Down
1 change: 1 addition & 0 deletions dask/dataframe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def _dask_expr_enabled() -> bool:
from_delayed,
from_dict,
from_graph,
from_legacy_dataframe,
from_map,
from_pandas,
get_dummies,
Expand Down
2 changes: 1 addition & 1 deletion dask/dataframe/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def func(self):
elif isinstance(original_prop, functools.cached_property):
method = original_prop.func
else:
raise TypeError("bind_property expects original class to provide a property")
method = original_prop
try:
func.__wrapped__ = method
except Exception:
Expand Down
4 changes: 3 additions & 1 deletion dask/dataframe/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,13 +771,15 @@ def to_backend(cls, data: _Frame, **kwargs):


@concat_dispatch.register_lazy("cudf")
@hash_object_dispatch.register_lazy("cudf")
@from_pyarrow_table_dispatch.register_lazy("cudf")
@group_split_dispatch.register_lazy("cudf")
@get_parallel_type.register_lazy("cudf")
@hash_object_dispatch.register_lazy("cudf")
@meta_nonempty.register_lazy("cudf")
@make_meta_dispatch.register_lazy("cudf")
@make_meta_obj.register_lazy("cudf")
@percentile_lookup.register_lazy("cudf")
@to_pyarrow_table_dispatch.register_lazy("cudf")
@tolist_dispatch.register_lazy("cudf")
def _register_cudf():
import dask_cudf # noqa: F401
Expand Down
17 changes: 17 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
Changelog
=========

.. _v2024.4.1:

2024.4.1
--------

This is a minor bugfix release that that fixes an error when importing
``dask.dataframe`` with Python 3.11.9.

See :pr:`11035` and :pr:`11039` from `Richard (Rick) Zamora`_ for details.

.. dropdown:: Additional changes

- Remove skips for named aggregations (:pr:`11036`) `Patrick Hoefler`_
- Don't deep-copy read-only buffers on unpickle (:pr-distributed:`8609`) `crusaderky`_
- Add ``dask-expr`` to ``dask`` conda recipe (:pr-distributed:`8601`) `Charles Blackmon-Luca`_


.. _v2024.4.0:

2024.4.0
Expand Down
1 change: 1 addition & 0 deletions docs/source/dataframe-api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ DataFrame
DataFrame.to_hdf
DataFrame.to_html
DataFrame.to_json
DataFrame.to_legacy_dataframe
DataFrame.to_parquet
DataFrame.to_records
DataFrame.to_string
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dataframe = [
"pandas >= 1.3",
"dask-expr >= 1.0, <1.1", # dask-expr pins the dask version
]
distributed = ["distributed == 2024.4.0"]
distributed = ["distributed == 2024.4.1"]
diagnostics = [
"bokeh >= 2.4.2",
"jinja2 >= 2.10.3",
Expand Down

0 comments on commit d602be2

Please sign in to comment.