Skip to content

Commit

Permalink
Add hash_dataset (#319)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Aug 26, 2020
1 parent a547dc1 commit 721cb7f
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 223 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ Changelog
Version 3.14.0 (2020-XX-YY)
===========================

New functionality
^^^^^^^^^^^^^^^^^
* Add ``hash_dataset`` functionality

Improvements
^^^^^^^^^^^^

Expand All @@ -13,6 +17,7 @@ Improvements

Version 3.13.1 (2020-08-04)
===========================

* Fix evaluation of "OR"-connected predicates (#295)

Version 3.13.0 (2020-07-30)
Expand All @@ -35,6 +40,7 @@ New functionality
Delete (can delete entire datasets/cube), API, CLI, Core and IO features.
* Advanced Features - Multi-Dataset with Single Table, Explicit physical Partitions, Seed based join system.


Version 3.11.0 (2020-07-15)
===========================

Expand Down
11 changes: 11 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ This is the most user friendly interface of the dask containers and offers direc

read_dataset_as_ddf
update_dataset_from_ddf
collect_dataset_metadata
hash_dataset

.. currentmodule:: kartothek.io.dask.compression

.. autosummary::

pack_payload_pandas
pack_payload
unpack_payload_pandas
unpack_payload

Bag
^^^
Expand Down
140 changes: 3 additions & 137 deletions kartothek/io/dask/_update.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
from functools import partial
from typing import Callable, List, Optional, Union
from typing import Callable, List, Optional

import dask.array as da
import dask.dataframe as dd
Expand All @@ -9,6 +8,7 @@
from dask.delayed import Delayed
from simplekv import KeyValueStore

from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas
from kartothek.io_components.metapartition import (
MetaPartition,
parse_input_to_metapartition,
Expand All @@ -19,11 +19,9 @@
from ._utils import map_delayed

StoreFactoryType = Callable[[], KeyValueStore]
_logger = logging.getLogger()

_KTK_HASH_BUCKET = "__KTK_HASH_BUCKET"

_PAYLOAD_COL = "__ktk_shuffle_payload"
_KTK_HASH_BUCKET = "__KTK_HASH_BUCKET"


def _hash_bucket(df: pd.DataFrame, subset: List[str], num_buckets: int):
Expand All @@ -43,136 +41,6 @@ def _hash_bucket(df: pd.DataFrame, subset: List[str], num_buckets: int):
return df.assign(**{_KTK_HASH_BUCKET: buckets.astype(f"uint{bit_width}")})


def pack_payload_pandas(partition: pd.DataFrame, group_key: List[str]) -> pd.DataFrame:
try:
# Technically distributed is an optional dependency
from distributed.protocol import serialize_bytes
except ImportError:
_logger.warning(
"Shuffle payload columns cannot be compressed since distributed is not installed."
)
return partition

if partition.empty:
res = partition[group_key]
res[_PAYLOAD_COL] = b""
else:
res = partition.groupby(
group_key,
sort=False,
observed=True,
# Keep the as_index s.t. the group values are not dropped. With this
# the behaviour seems to be consistent along pandas versions
as_index=True,
).apply(lambda x: pd.Series({_PAYLOAD_COL: serialize_bytes(x)}))

res = res.reset_index()
return res


def pack_payload(df: dd.DataFrame, group_key: Union[List[str], str]) -> dd.DataFrame:
"""
Pack all payload columns (everything except of group_key) into a single
columns. This column will contain a single byte string containing the
serialized and compressed payload data. The payload data is just dead weight
when reshuffling. By compressing it once before the shuffle starts, this
saves a lot of memory and network/disk IO.
Example::
>>> import pandas as pd
... import dask.dataframe as dd
... from dask.dataframe.shuffle import pack_payload
...
... df = pd.DataFrame({"A": [1, 1] * 2 + [2, 2] * 2 + [3, 3] * 2, "B": range(12)})
... ddf = dd.from_pandas(df, npartitions=2)
>>> ddf.partitions[0].compute()
A B
0 1 0
1 1 1
2 1 2
3 1 3
4 2 4
5 2 5
>>> pack_payload(ddf, "A").partitions[0].compute()
A __dask_payload_bytes
0 1 b'\x03\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x03...
1 2 b'\x03\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x03...
See also https://github.com/dask/dask/pull/6259
"""

if (
# https://github.com/pandas-dev/pandas/issues/34455
isinstance(df._meta.index, pd.Float64Index)
# TODO: Try to find out what's going on an file a bug report
# For datetime indices the apply seems to be corrupt
# s.t. apply(lambda x:x) returns different values
or isinstance(df._meta.index, pd.DatetimeIndex)
):
return df

if not isinstance(group_key, list):
group_key = [group_key]

packed_meta = df._meta[group_key]
packed_meta[_PAYLOAD_COL] = b""

_pack_payload = partial(pack_payload_pandas, group_key=group_key)

return df.map_partitions(_pack_payload, meta=_pack_payload(df._meta))


def unpack_payload_pandas(
partition: pd.DataFrame, unpack_meta: pd.DataFrame
) -> pd.DataFrame:
"""
Revert ``pack_payload_pandas`` and restore packed payload
unpack_meta:
A dataframe indicating the sc
"""
try:
# Technically distributed is an optional dependency
from distributed.protocol import deserialize_bytes
except ImportError:
_logger.warning(
"Shuffle payload columns cannot be compressed since distributed is not installed."
)
return partition

if partition.empty:
return unpack_meta.iloc[:0]

mapped = partition[_PAYLOAD_COL].map(deserialize_bytes)

return pd.concat(mapped.values, copy=False, ignore_index=True)


def unpack_payload(df: dd.DataFrame, unpack_meta: pd.DataFrame) -> dd.DataFrame:
"""Revert payload packing of ``pack_payload`` and restores full dataframe."""

if (
# https://github.com/pandas-dev/pandas/issues/34455
isinstance(df._meta.index, pd.Float64Index)
# TODO: Try to find out what's going on an file a bug report
# For datetime indices the apply seems to be corrupt
# s.t. apply(lambda x:x) returns different values
or isinstance(df._meta.index, pd.DatetimeIndex)
):
return df

return df.map_partitions(
unpack_payload_pandas, unpack_meta=unpack_meta, meta=unpack_meta
)


def update_dask_partitions_shuffle(
ddf: dd.DataFrame,
table: str,
Expand Down Expand Up @@ -247,8 +115,6 @@ def update_dask_partitions_shuffle(
ddf = ddf.map_partitions(_hash_bucket, bucket_by, num_buckets, meta=meta)
group_cols.append(_KTK_HASH_BUCKET)

packed_meta = ddf._meta[group_cols]
packed_meta[_PAYLOAD_COL] = b""
unpacked_meta = ddf._meta

ddf = pack_payload(ddf, group_key=group_cols)
Expand Down
139 changes: 139 additions & 0 deletions kartothek/io/dask/compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import logging
from functools import partial
from typing import List, Union

import dask.dataframe as dd
import pandas as pd

_logger = logging.getLogger()
_PAYLOAD_COL = "__ktk_shuffle_payload"


def pack_payload_pandas(partition: pd.DataFrame, group_key: List[str]) -> pd.DataFrame:
try:
# Technically distributed is an optional dependency
from distributed.protocol import serialize_bytes
except ImportError:
_logger.warning(
"Shuffle payload columns cannot be compressed since distributed is not installed."
)
return partition

if partition.empty:
res = partition[group_key]
res[_PAYLOAD_COL] = b""
else:
res = partition.groupby(
group_key,
sort=False,
observed=True,
# Keep the as_index s.t. the group values are not dropped. With this
# the behaviour seems to be consistent along pandas versions
as_index=True,
).apply(lambda x: pd.Series({_PAYLOAD_COL: serialize_bytes(x)}))

res = res.reset_index()
return res


def pack_payload(df: dd.DataFrame, group_key: Union[List[str], str]) -> dd.DataFrame:
"""
Pack all payload columns (everything except of group_key) into a single
columns. This column will contain a single byte string containing the
serialized and compressed payload data. The payload data is just dead weight
when reshuffling. By compressing it once before the shuffle starts, this
saves a lot of memory and network/disk IO.
Example::
>>> import pandas as pd
... import dask.dataframe as dd
... from dask.dataframe.shuffle import pack_payload
...
... df = pd.DataFrame({"A": [1, 1] * 2 + [2, 2] * 2 + [3, 3] * 2, "B": range(12)})
... ddf = dd.from_pandas(df, npartitions=2)
>>> ddf.partitions[0].compute()
A B
0 1 0
1 1 1
2 1 2
3 1 3
4 2 4
5 2 5
>>> pack_payload(ddf, "A").partitions[0].compute()
A __dask_payload_bytes
0 1 b'\x03\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x03...
1 2 b'\x03\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x03...
See also https://github.com/dask/dask/pull/6259
"""

if (
# https://github.com/pandas-dev/pandas/issues/34455
isinstance(df._meta.index, pd.Float64Index)
# TODO: Try to find out what's going on an file a bug report
# For datetime indices the apply seems to be corrupt
# s.t. apply(lambda x:x) returns different values
or isinstance(df._meta.index, pd.DatetimeIndex)
):
return df

if not isinstance(group_key, list):
group_key = [group_key]

packed_meta = df._meta[group_key]
packed_meta[_PAYLOAD_COL] = b""

_pack_payload = partial(pack_payload_pandas, group_key=group_key)

return df.map_partitions(_pack_payload, meta=packed_meta)


def unpack_payload_pandas(
partition: pd.DataFrame, unpack_meta: pd.DataFrame
) -> pd.DataFrame:
"""
Revert ``pack_payload_pandas`` and restore packed payload
unpack_meta:
A dataframe indicating the schema of the unpacked data. This will be returned in case the input is empty
"""
try:
# Technically distributed is an optional dependency
from distributed.protocol import deserialize_bytes
except ImportError:
_logger.warning(
"Shuffle payload columns cannot be compressed since distributed is not installed."
)
return partition

if partition.empty:
return unpack_meta.iloc[:0]

mapped = partition[_PAYLOAD_COL].map(deserialize_bytes)

return pd.concat(mapped.values, copy=False, ignore_index=True)


def unpack_payload(df: dd.DataFrame, unpack_meta: pd.DataFrame) -> dd.DataFrame:
"""Revert payload packing of ``pack_payload`` and restores full dataframe."""

if (
# https://github.com/pandas-dev/pandas/issues/34455
isinstance(df._meta.index, pd.Float64Index)
# TODO: Try to find out what's going on an file a bug report
# For datetime indices the apply seems to be corrupt
# s.t. apply(lambda x:x) returns different values
or isinstance(df._meta.index, pd.DatetimeIndex)
):
return df

return df.map_partitions(
unpack_payload_pandas, unpack_meta=unpack_meta, meta=unpack_meta
)
Loading

0 comments on commit 721cb7f

Please sign in to comment.