Skip to content

Commit

Permalink
Enable shuffle in build_cube_from_dataframe (#347)
Browse files Browse the repository at this point in the history
* Enable shuffle in build_cube_from_dataframe

* Revert index hasnans raise

* Factor out core of store and update ddf

* Include store_dataset_from_ddf in shuffle tests
  • Loading branch information
fjetter authored Sep 28, 2020
1 parent 555a01f commit 2eaa3ba
Show file tree
Hide file tree
Showing 20 changed files with 832 additions and 507 deletions.
13 changes: 13 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,19 @@ Changelog
Version 3.15.0 (unreleased)
===========================

New functionality
^^^^^^^^^^^^^^^^^
* Add :func:``~kartothek.io.dask.dataframe.store_dataset_from_ddf`` to offer write
support of a dask dataframe without update support. This forbids or explicitly
allows overwrites and does not update existing datasets.
* The ``sort_partitions_by`` feature now supports multiple columns. While this
has only marginal effect for predicate pushdown, it may be used to improve the
parquet compression.
* ``build_cube_from_dataframe`` now supports the ``shuffle`` methods offered by
:func:``~kartothek.io.dask.dataframe.store_dataset_from_ddf`` and
:func:``~kartothek.io.dask.dataframe.update_dataset_from_ddf`` but writes the
output in the cube format

Improvements
^^^^^^^^^^^^
* Reduce memory consumption during index write.
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ This is the most user friendly interface of the dask containers and offers direc
.. autosummary::

read_dataset_as_ddf
store_dataset_from_ddf
update_dataset_from_ddf
collect_dataset_metadata
hash_dataset
Expand Down
103 changes: 24 additions & 79 deletions kartothek/io/dask/_update.py → kartothek/io/dask/_shuffle.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
from functools import partial
from typing import List, Optional
from typing import List, Optional, Sequence, Union

import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.delayed import Delayed

from kartothek.core.typing import StoreFactory
from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas
from kartothek.io_components.metapartition import (
MetaPartition,
parse_input_to_metapartition,
)
from kartothek.io_components.utils import sort_values_categorical
from kartothek.io_components.metapartition import MetaPartition
from kartothek.io_components.write import write_partition
from kartothek.serialization import DataFrameSerializer

from ._utils import map_delayed

_KTK_HASH_BUCKET = "__KTK_HASH_BUCKET"


def _hash_bucket(df: pd.DataFrame, subset: List[str], num_buckets: int):
def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets: int):
"""
Categorize each row of `df` based on the data in the columns `subset`
into `num_buckets` values. This is based on `pandas.util.hash_pandas_object`
"""

if subset is None:
if not subset:
subset = df.columns
hash_arr = pd.util.hash_pandas_object(df[subset], index=False)
buckets = hash_arr % num_buckets
Expand All @@ -38,18 +32,18 @@ def _hash_bucket(df: pd.DataFrame, subset: List[str], num_buckets: int):
return df.assign(**{_KTK_HASH_BUCKET: buckets.astype(f"uint{bit_width}")})


def update_dask_partitions_shuffle(
def shuffle_store_dask_partitions(
ddf: dd.DataFrame,
table: str,
secondary_indices: List[str],
secondary_indices: Optional[Union[str, Sequence[str]]],
metadata_version: int,
partition_on: List[str],
store_factory: StoreFactory,
df_serializer: DataFrameSerializer,
df_serializer: Optional[DataFrameSerializer],
dataset_uuid: str,
num_buckets: int,
sort_partitions_by: Optional[str],
bucket_by: List[str],
sort_partitions_by: List[str],
bucket_by: Sequence[str],
) -> da.Array:
"""
Perform a dataset update with dask reshuffling to control partitioning.
Expand Down Expand Up @@ -118,7 +112,7 @@ def update_dask_partitions_shuffle(
ddf = ddf.groupby(by=group_cols)
ddf = ddf.apply(
partial(
_store_partition,
_unpack_store_partition,
secondary_indices=secondary_indices,
sort_partitions_by=sort_partitions_by,
table=table,
Expand All @@ -134,55 +128,10 @@ def update_dask_partitions_shuffle(
return ddf


def update_dask_partitions_one_to_one(
delayed_tasks: List[Delayed],
secondary_indices: List[str],
metadata_version: int,
partition_on: List[str],
store_factory: StoreFactory,
df_serializer: DataFrameSerializer,
dataset_uuid: str,
sort_partitions_by: Optional[str],
) -> List[Delayed]:
"""
Perform an ordinary, partition wise update where the usual partition
pre-store processing is applied to every dask internal partition.
"""
input_to_mps = partial(
parse_input_to_metapartition,
metadata_version=metadata_version,
expected_secondary_indices=secondary_indices,
)
mps = map_delayed(input_to_mps, delayed_tasks)

if sort_partitions_by:
mps = map_delayed(
partial(
MetaPartition.apply,
# FIXME: Type checks collide with partial. We should rename
# apply func kwarg
**{"func": partial(sort_values_categorical, column=sort_partitions_by)},
),
mps,
)
if partition_on:
mps = map_delayed(MetaPartition.partition_on, mps, partition_on=partition_on)
if secondary_indices:
mps = map_delayed(MetaPartition.build_indices, mps, columns=secondary_indices)

return map_delayed(
MetaPartition.store_dataframes,
mps,
store=store_factory,
df_serializer=df_serializer,
dataset_uuid=dataset_uuid,
)


def _store_partition(
def _unpack_store_partition(
df: pd.DataFrame,
secondary_indices: List[str],
sort_partitions_by: Optional[str],
sort_partitions_by: List[str],
table: str,
dataset_uuid: str,
partition_on: Optional[List[str]],
Expand All @@ -191,22 +140,18 @@ def _store_partition(
metadata_version: int,
unpacked_meta: pd.DataFrame,
) -> MetaPartition:
"""Unpack payload data and store partition"""
df = unpack_payload_pandas(df, unpacked_meta)
if _KTK_HASH_BUCKET in df:
df = df.drop(_KTK_HASH_BUCKET, axis=1)
store = store_factory()
# I don't have access to the group values
mps = parse_input_to_metapartition(
{"data": {table: df}}, metadata_version=metadata_version
)
# delete reference to enable release after partition_on; before index build
del df
if sort_partitions_by:
mps = mps.apply(partial(sort_values_categorical, column=sort_partitions_by))
if partition_on:
mps = mps.partition_on(partition_on)
if secondary_indices:
mps = mps.build_indices(secondary_indices)
return mps.store_dataframes(
store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer
return write_partition(
partition_df=df,
secondary_indices=secondary_indices,
sort_partitions_by=sort_partitions_by,
dataset_table_name=table,
dataset_uuid=dataset_uuid,
partition_on=partition_on,
store_factory=store_factory,
df_serializer=df_serializer,
metadata_version=metadata_version,
)
Loading

0 comments on commit 2eaa3ba

Please sign in to comment.