diff --git a/CHANGES.rst b/CHANGES.rst index d18f5163..39946ac7 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -5,6 +5,19 @@ Changelog Version 3.15.0 (unreleased) =========================== +New functionality +^^^^^^^^^^^^^^^^^ +* Add :func:``~kartothek.io.dask.dataframe.store_dataset_from_ddf`` to offer write + support of a dask dataframe without update support. This forbids or explicitly + allows overwrites and does not update existing datasets. +* The ``sort_partitions_by`` feature now supports multiple columns. While this + has only marginal effect for predicate pushdown, it may be used to improve the + parquet compression. +* ``build_cube_from_dataframe`` now supports the ``shuffle`` methods offered by + :func:``~kartothek.io.dask.dataframe.store_dataset_from_ddf`` and + :func:``~kartothek.io.dask.dataframe.update_dataset_from_ddf`` but writes the + output in the cube format + Improvements ^^^^^^^^^^^^ * Reduce memory consumption during index write. diff --git a/docs/api.rst b/docs/api.rst index 31222ab4..a4617f94 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -100,6 +100,7 @@ This is the most user friendly interface of the dask containers and offers direc .. autosummary:: read_dataset_as_ddf + store_dataset_from_ddf update_dataset_from_ddf collect_dataset_metadata hash_dataset diff --git a/kartothek/io/dask/_update.py b/kartothek/io/dask/_shuffle.py similarity index 64% rename from kartothek/io/dask/_update.py rename to kartothek/io/dask/_shuffle.py index 4ca503fb..08f434c9 100644 --- a/kartothek/io/dask/_update.py +++ b/kartothek/io/dask/_shuffle.py @@ -1,33 +1,27 @@ from functools import partial -from typing import List, Optional +from typing import List, Optional, Sequence, Union import dask.array as da import dask.dataframe as dd import numpy as np import pandas as pd -from dask.delayed import Delayed from kartothek.core.typing import StoreFactory from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas -from kartothek.io_components.metapartition import ( - MetaPartition, - parse_input_to_metapartition, -) -from kartothek.io_components.utils import sort_values_categorical +from kartothek.io_components.metapartition import MetaPartition +from kartothek.io_components.write import write_partition from kartothek.serialization import DataFrameSerializer -from ._utils import map_delayed - _KTK_HASH_BUCKET = "__KTK_HASH_BUCKET" -def _hash_bucket(df: pd.DataFrame, subset: List[str], num_buckets: int): +def _hash_bucket(df: pd.DataFrame, subset: Optional[Sequence[str]], num_buckets: int): """ Categorize each row of `df` based on the data in the columns `subset` into `num_buckets` values. This is based on `pandas.util.hash_pandas_object` """ - if subset is None: + if not subset: subset = df.columns hash_arr = pd.util.hash_pandas_object(df[subset], index=False) buckets = hash_arr % num_buckets @@ -38,18 +32,18 @@ def _hash_bucket(df: pd.DataFrame, subset: List[str], num_buckets: int): return df.assign(**{_KTK_HASH_BUCKET: buckets.astype(f"uint{bit_width}")}) -def update_dask_partitions_shuffle( +def shuffle_store_dask_partitions( ddf: dd.DataFrame, table: str, - secondary_indices: List[str], + secondary_indices: Optional[Union[str, Sequence[str]]], metadata_version: int, partition_on: List[str], store_factory: StoreFactory, - df_serializer: DataFrameSerializer, + df_serializer: Optional[DataFrameSerializer], dataset_uuid: str, num_buckets: int, - sort_partitions_by: Optional[str], - bucket_by: List[str], + sort_partitions_by: List[str], + bucket_by: Sequence[str], ) -> da.Array: """ Perform a dataset update with dask reshuffling to control partitioning. @@ -118,7 +112,7 @@ def update_dask_partitions_shuffle( ddf = ddf.groupby(by=group_cols) ddf = ddf.apply( partial( - _store_partition, + _unpack_store_partition, secondary_indices=secondary_indices, sort_partitions_by=sort_partitions_by, table=table, @@ -134,55 +128,10 @@ def update_dask_partitions_shuffle( return ddf -def update_dask_partitions_one_to_one( - delayed_tasks: List[Delayed], - secondary_indices: List[str], - metadata_version: int, - partition_on: List[str], - store_factory: StoreFactory, - df_serializer: DataFrameSerializer, - dataset_uuid: str, - sort_partitions_by: Optional[str], -) -> List[Delayed]: - """ - Perform an ordinary, partition wise update where the usual partition - pre-store processing is applied to every dask internal partition. - """ - input_to_mps = partial( - parse_input_to_metapartition, - metadata_version=metadata_version, - expected_secondary_indices=secondary_indices, - ) - mps = map_delayed(input_to_mps, delayed_tasks) - - if sort_partitions_by: - mps = map_delayed( - partial( - MetaPartition.apply, - # FIXME: Type checks collide with partial. We should rename - # apply func kwarg - **{"func": partial(sort_values_categorical, column=sort_partitions_by)}, - ), - mps, - ) - if partition_on: - mps = map_delayed(MetaPartition.partition_on, mps, partition_on=partition_on) - if secondary_indices: - mps = map_delayed(MetaPartition.build_indices, mps, columns=secondary_indices) - - return map_delayed( - MetaPartition.store_dataframes, - mps, - store=store_factory, - df_serializer=df_serializer, - dataset_uuid=dataset_uuid, - ) - - -def _store_partition( +def _unpack_store_partition( df: pd.DataFrame, secondary_indices: List[str], - sort_partitions_by: Optional[str], + sort_partitions_by: List[str], table: str, dataset_uuid: str, partition_on: Optional[List[str]], @@ -191,22 +140,18 @@ def _store_partition( metadata_version: int, unpacked_meta: pd.DataFrame, ) -> MetaPartition: + """Unpack payload data and store partition""" df = unpack_payload_pandas(df, unpacked_meta) if _KTK_HASH_BUCKET in df: df = df.drop(_KTK_HASH_BUCKET, axis=1) - store = store_factory() - # I don't have access to the group values - mps = parse_input_to_metapartition( - {"data": {table: df}}, metadata_version=metadata_version - ) - # delete reference to enable release after partition_on; before index build - del df - if sort_partitions_by: - mps = mps.apply(partial(sort_values_categorical, column=sort_partitions_by)) - if partition_on: - mps = mps.partition_on(partition_on) - if secondary_indices: - mps = mps.build_indices(secondary_indices) - return mps.store_dataframes( - store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer + return write_partition( + partition_df=df, + secondary_indices=secondary_indices, + sort_partitions_by=sort_partitions_by, + dataset_table_name=table, + dataset_uuid=dataset_uuid, + partition_on=partition_on, + store_factory=store_factory, + df_serializer=df_serializer, + metadata_version=metadata_version, ) diff --git a/kartothek/io/dask/dataframe.py b/kartothek/io/dask/dataframe.py index 47aa24af..90e84498 100644 --- a/kartothek/io/dask/dataframe.py +++ b/kartothek/io/dask/dataframe.py @@ -1,5 +1,14 @@ import random -from typing import Callable, Optional +from typing import ( + Callable, + Iterable, + List, + Mapping, + Optional, + SupportsFloat, + Union, + cast, +) import dask import dask.dataframe as dd @@ -11,6 +20,7 @@ from kartothek.core.docs import default_docs from kartothek.core.factory import DatasetFactory, _ensure_factory from kartothek.core.naming import DEFAULT_METADATA_VERSION +from kartothek.core.typing import StoreFactory, StoreInput from kartothek.io.dask.compression import pack_payload, unpack_payload_pandas from kartothek.io_components.metapartition import ( _METADATA_SCHEMA, @@ -27,9 +37,14 @@ normalize_args, validate_partition_keys, ) -from kartothek.serialization import PredicatesType +from kartothek.io_components.write import ( + raise_if_dataset_exists, + store_dataset_from_partitions, + write_partition, +) +from kartothek.serialization import DataFrameSerializer, PredicatesType -from ._update import update_dask_partitions_one_to_one, update_dask_partitions_shuffle +from ._shuffle import shuffle_store_dask_partitions from ._utils import _maybe_get_categoricals_from_index from .delayed import read_table_as_delayed @@ -140,29 +155,8 @@ def _get_dask_meta_for_dataset( return meta -@default_docs -def update_dataset_from_ddf( - ddf, - store=None, - dataset_uuid=None, - table=SINGLE_TABLE, - secondary_indices=None, - shuffle=False, - repartition_ratio=None, - num_buckets=1, - sort_partitions_by=None, - delete_scope=None, - metadata=None, - df_serializer=None, - metadata_merger=None, - default_metadata_version=DEFAULT_METADATA_VERSION, - partition_on=None, - factory=None, - bucket_by=None, -): - """ - Update a dataset from a dask.dataframe. - +def _shuffle_docs(func): + func.__doc__ += """ .. admonition:: Behavior without ``shuffle==False`` @@ -243,41 +237,104 @@ def update_dataset_from_ddf( .. note:: Only columns with data types which can be hashed are allowed to be used in this. +""" + return func + + +@default_docs +@_shuffle_docs +def store_dataset_from_ddf( + ddf: dd.DataFrame, + store: StoreInput, + dataset_uuid: str, + table: str = SINGLE_TABLE, + secondary_indices: Optional[List[str]] = None, + shuffle: bool = False, + repartition_ratio: Optional[SupportsFloat] = None, + num_buckets: int = 1, + sort_partitions_by: Optional[Union[List[str], str]] = None, + delete_scope: Optional[Iterable[Mapping[str, str]]] = None, + metadata: Optional[Mapping] = None, + df_serializer: Optional[DataFrameSerializer] = None, + metadata_merger: Optional[Callable] = None, + metadata_version: int = DEFAULT_METADATA_VERSION, + partition_on: Optional[List[str]] = None, + bucket_by: Optional[Union[List[str], str]] = None, + overwrite: bool = False, +): + """ + Store a dataset from a dask.dataframe. """ partition_on = normalize_arg("partition_on", partition_on) secondary_indices = normalize_arg("secondary_indices", secondary_indices) + sort_partitions_by = normalize_arg("sort_partitions_by", sort_partitions_by) + bucket_by = normalize_arg("bucket_by", bucket_by) store = normalize_arg("store", store) delete_scope = dask.delayed(normalize_arg)("delete_scope", delete_scope) if table is None: raise TypeError("The parameter `table` is not optional.") - ds_factory, metadata_version, partition_on = validate_partition_keys( - dataset_uuid=dataset_uuid, + + ds_factory = _ensure_factory( + dataset_uuid=dataset_uuid, store=store, factory=None, load_dataset_metadata=True + ) + + if not overwrite: + raise_if_dataset_exists(dataset_uuid=dataset_uuid, store=store) + mps = _write_dataframe_partitions( + ddf=ddf, store=store, - default_metadata_version=default_metadata_version, + dataset_uuid=dataset_uuid, + table=table, + secondary_indices=secondary_indices, + shuffle=shuffle, + repartition_ratio=repartition_ratio, + num_buckets=num_buckets, + sort_partitions_by=sort_partitions_by, + df_serializer=df_serializer, + metadata_version=metadata_version, partition_on=partition_on, - ds_factory=factory, + bucket_by=bucket_by, + ) + return dask.delayed(store_dataset_from_partitions)( + mps, + store=ds_factory.store_factory if ds_factory else store, + dataset_uuid=ds_factory.dataset_uuid if ds_factory else dataset_uuid, + dataset_metadata=metadata, + metadata_merger=metadata_merger, ) - if ds_factory is not None: - check_single_table_dataset(ds_factory, table) +def _write_dataframe_partitions( + ddf: dd.DataFrame, + store: StoreFactory, + dataset_uuid: str, + table: str, + secondary_indices: List[str], + shuffle: bool, + repartition_ratio: Optional[SupportsFloat], + num_buckets: int, + sort_partitions_by: List[str], + df_serializer: Optional[DataFrameSerializer], + metadata_version: int, + partition_on: List[str], + bucket_by: List[str], +) -> dd.Series: if repartition_ratio and ddf is not None: ddf = ddf.repartition( npartitions=int(np.ceil(ddf.npartitions / repartition_ratio)) ) if ddf is None: - mps = [ - parse_input_to_metapartition( - None, metadata_version=default_metadata_version - ) - ] + mps = dd.from_pandas( + pd.Series( + [parse_input_to_metapartition(None, metadata_version=metadata_version)] + ), + npartitions=1, + ) else: - secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices) - if shuffle: - mps = update_dask_partitions_shuffle( + mps = shuffle_store_dask_partitions( ddf=ddf, table=table, secondary_indices=secondary_indices, @@ -291,10 +348,8 @@ def update_dataset_from_ddf( bucket_by=bucket_by, ) else: - delayed_tasks = ddf.to_delayed() - delayed_tasks = [{"data": {table: task}} for task in delayed_tasks] - mps = update_dask_partitions_one_to_one( - delayed_tasks=delayed_tasks, + mps = ddf.map_partitions( + write_partition, secondary_indices=secondary_indices, metadata_version=metadata_version, partition_on=partition_on, @@ -302,7 +357,73 @@ def update_dataset_from_ddf( df_serializer=df_serializer, dataset_uuid=dataset_uuid, sort_partitions_by=sort_partitions_by, + dataset_table_name=table, + meta=(MetaPartition), ) + return mps + + +@default_docs +@_shuffle_docs +def update_dataset_from_ddf( + ddf: dd.DataFrame, + store: Optional[StoreInput] = None, + dataset_uuid: Optional[str] = None, + table: str = SINGLE_TABLE, + secondary_indices: Optional[List[str]] = None, + shuffle: bool = False, + repartition_ratio: Optional[SupportsFloat] = None, + num_buckets: int = 1, + sort_partitions_by: Optional[Union[List[str], str]] = None, + delete_scope: Optional[Iterable[Mapping[str, str]]] = None, + metadata: Optional[Mapping] = None, + df_serializer: Optional[DataFrameSerializer] = None, + metadata_merger: Optional[Callable] = None, + default_metadata_version: int = DEFAULT_METADATA_VERSION, + partition_on: Optional[List[str]] = None, + factory: Optional[DatasetFactory] = None, + bucket_by: Optional[Union[List[str], str]] = None, +): + """ + Update a dataset from a dask.dataframe. + """ + partition_on = normalize_arg("partition_on", partition_on) + secondary_indices = normalize_arg("secondary_indices", secondary_indices) + sort_partitions_by = normalize_arg("sort_partitions_by", sort_partitions_by) + bucket_by = normalize_arg("bucket_by", bucket_by) + store = normalize_arg("store", store) + delete_scope = dask.delayed(normalize_arg)("delete_scope", delete_scope) + + if table is None: + raise TypeError("The parameter `table` is not optional.") + ds_factory, metadata_version, partition_on = validate_partition_keys( + dataset_uuid=dataset_uuid, + store=store, + default_metadata_version=default_metadata_version, + partition_on=partition_on, + ds_factory=factory, + ) + + _ensure_compatible_indices(ds_factory, secondary_indices) + + if ds_factory is not None: + check_single_table_dataset(ds_factory, table) + + mps = _write_dataframe_partitions( + ddf=ddf, + store=store, + dataset_uuid=dataset_uuid or ds_factory.dataset_uuid, + table=table, + secondary_indices=secondary_indices, + shuffle=shuffle, + repartition_ratio=repartition_ratio, + num_buckets=num_buckets, + sort_partitions_by=sort_partitions_by, + df_serializer=df_serializer, + metadata_version=metadata_version, + partition_on=cast(List[str], partition_on), + bucket_by=bucket_by, + ) return dask.delayed(update_dataset_from_partitions)( mps, store_factory=store, @@ -414,7 +535,7 @@ def _hash_partition(part): @default_docs @normalize_args def hash_dataset( - store: Optional[Callable[[], KeyValueStore]] = None, + store: Optional[StoreInput] = None, dataset_uuid: Optional[str] = None, subset=None, group_key=None, diff --git a/kartothek/io/dask/dataframe_cube.py b/kartothek/io/dask/dataframe_cube.py index 2a1ea4e8..64d235cb 100644 --- a/kartothek/io/dask/dataframe_cube.py +++ b/kartothek/io/dask/dataframe_cube.py @@ -1,15 +1,33 @@ """ Dask.DataFrame IO. """ +from typing import Any, Callable, Dict, Iterable, Optional, Union + +import dask import dask.bag as db -import dask.dataframe as ddf +import dask.dataframe as dd +import simplekv +from dask.delayed import Delayed +from kartothek.api.discover import discover_datasets_unchecked +from kartothek.core.cube.cube import Cube +from kartothek.core.docs import default_docs from kartothek.io.dask.common_cube import ( append_to_cube_from_bag_internal, - build_cube_from_bag_internal, extend_cube_from_bag_internal, query_cube_bag_internal, ) +from kartothek.io.dask.dataframe import store_dataset_from_ddf +from kartothek.io_components.cube.common import check_store_factory +from kartothek.io_components.cube.write import ( + apply_postwrite_checks, + assert_dimesion_index_cols_notnull, + check_datasets_prebuild, + check_provided_metadata_dict, + check_user_df, + prepare_ktk_metadata, + prepare_ktk_partition_on, +) __all__ = ( "append_to_cube_from_dataframe", @@ -19,49 +37,94 @@ ) +@default_docs def build_cube_from_dataframe( - data, cube, store, metadata=None, overwrite=False, partition_on=None -): + data: Union[dd.DataFrame, Dict[str, dd.DataFrame]], + cube: Cube, + store: Callable[[], simplekv.KeyValueStore], + metadata: Optional[Dict[str, Dict[str, Any]]] = None, + overwrite: bool = False, + partition_on: Optional[Dict[str, Iterable[str]]] = None, + shuffle: bool = False, + num_buckets: int = 1, + bucket_by: Optional[Iterable[str]] = None, +) -> Delayed: """ Create dask computation graph that builds a cube with the data supplied from a dask dataframe. Parameters ---------- - data: Union[dask.DataFrame, Dict[str, dask.DataFrame] + data: Data that should be written to the cube. If only a single dataframe is given, it is assumed to be the seed dataset. - cube: kartothek.core.cube.cube.Cube + cube: Cube specification. - store: Callable[[], simplekv.KeyValueStore] + store: Store to which the data should be written to. - metadata: Optional[Dict[str, Dict[str, Any]]] + metadata: Metadata for every dataset. - overwrite: bool + overwrite: If possibly existing datasets should be overwritten. - partition_on: Optional[Dict[str, Iterable[str]]] + partition_on: Optional parition-on attributes for datasets (dictionary mapping :term:`Dataset ID` -> columns). See :ref:`Dimensionality and Partitioning Details` for details. Returns ------- - metadata_dict: dask.Delayed + metadata_dict: A dask delayed object containing the compute graph to build a cube returning the dict of dataset metadata objects. """ - data, ktk_cube_dataset_ids = _ddfs_to_bag(data, cube) + check_store_factory(store) + if not isinstance(data, dict): + data = {cube.seed_dataset: data} - return ( - build_cube_from_bag_internal( - data=data, + ktk_cube_dataset_ids = sorted(data.keys()) + + metadata = check_provided_metadata_dict(metadata, ktk_cube_dataset_ids) + existing_datasets = discover_datasets_unchecked(cube.uuid_prefix, store) + check_datasets_prebuild(ktk_cube_dataset_ids, cube, existing_datasets) + partition_on_checked = prepare_ktk_partition_on( + cube, ktk_cube_dataset_ids, partition_on + ) + del partition_on + + dct = {} + for table_name, ddf in data.items(): + check_user_df(table_name, ddf, cube, set(), partition_on_checked[table_name]) + + indices_to_build = set(cube.index_columns) & set(ddf.columns) + if table_name == cube.seed_dataset: + indices_to_build |= set(cube.dimension_columns) + indices_to_build -= set(partition_on_checked[table_name]) + + ddf = ddf.map_partitions( + assert_dimesion_index_cols_notnull, + ktk_cube_dataset_id=table_name, cube=cube, + partition_on=partition_on_checked[table_name], + meta=ddf._meta, + ) + graph = store_dataset_from_ddf( + ddf, + dataset_uuid=cube.ktk_dataset_uuid(table_name), store=store, - ktk_cube_dataset_ids=ktk_cube_dataset_ids, - metadata=metadata, + metadata=prepare_ktk_metadata(cube, table_name, metadata), + partition_on=partition_on_checked[table_name], + secondary_indices=sorted(indices_to_build), + sort_partitions_by=sorted( + (set(cube.dimension_columns) - set(cube.partition_columns)) + & set(ddf.columns) + ), overwrite=overwrite, - partition_on=partition_on, + shuffle=shuffle, + num_buckets=num_buckets, + bucket_by=bucket_by, ) - .map_partitions(_unpack_list, default=None) - .to_delayed()[0] + dct[table_name] = graph + + return dask.delayed(apply_postwrite_checks)( + dct, cube=cube, store=store, existing_datasets=existing_datasets ) @@ -169,7 +232,7 @@ def query_cube_dataframe( dfs = b.map_partitions(_unpack_list, default=empty).to_delayed() - return ddf.from_delayed( + return dd.from_delayed( dfs=dfs, meta=empty, divisions=None # TODO: figure out an API to support this ) diff --git a/kartothek/io/dask/delayed.py b/kartothek/io/dask/delayed.py index 0c65be25..3646a3bb 100644 --- a/kartothek/io/dask/delayed.py +++ b/kartothek/io/dask/delayed.py @@ -37,9 +37,9 @@ from kartothek.io_components.write import ( raise_if_dataset_exists, store_dataset_from_partitions, + write_partition, ) -from ._update import update_dask_partitions_one_to_one from ._utils import ( _cast_categorical_to_index_cat, _get_data, @@ -470,8 +470,9 @@ def update_dataset_from_delayed( ) secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices) - mps = update_dask_partitions_one_to_one( - delayed_tasks=delayed_tasks, + mps = map_delayed( + write_partition, + delayed_tasks, secondary_indices=secondary_indices, metadata_version=metadata_version, partition_on=partition_on, diff --git a/kartothek/io/eager.py b/kartothek/io/eager.py index 192c11ed..006292a4 100644 --- a/kartothek/io/eager.py +++ b/kartothek/io/eager.py @@ -679,7 +679,7 @@ def update_dataset_from_dataframes( partition_on: Optional[List[str]] = None, load_dynamic_metadata: bool = True, sort_partitions_by: Optional[str] = None, - secondary_indices: List[str] = None, + secondary_indices: Optional[List[str]] = None, factory: Optional[DatasetFactory] = None, ) -> DatasetMetadata: """ @@ -725,7 +725,7 @@ def update_dataset_from_dataframes( ) if sort_partitions_by: - mp = mp.apply(partial(sort_values_categorical, column=sort_partitions_by)) + mp = mp.apply(partial(sort_values_categorical, columns=sort_partitions_by)) if partition_on: mp = mp.partition_on(partition_on) diff --git a/kartothek/io/eager_cube.py b/kartothek/io/eager_cube.py index 95a1b74f..92603a3d 100644 --- a/kartothek/io/eager_cube.py +++ b/kartothek/io/eager_cube.py @@ -30,6 +30,7 @@ reduce_stats, ) from kartothek.io_components.cube.write import ( + MultiTableCommitAborted, apply_postwrite_checks, check_datasets_prebuild, check_datasets_preextend, @@ -659,10 +660,13 @@ def _prepare_data_for_ktk_all(data, cube, existing_payload, partition_on): if part.is_sentinel } if empty_datasets: - raise ValueError( + cause = ValueError( "Cannot write empty datasets: {empty_datasets}".format( empty_datasets=", ".join(sorted(empty_datasets)) ) ) + exc = MultiTableCommitAborted("Aborting commit.") + exc.__cause__ = cause + raise exc return data diff --git a/kartothek/io/iter.py b/kartothek/io/iter.py index 01245e04..86926b49 100644 --- a/kartothek/io/iter.py +++ b/kartothek/io/iter.py @@ -234,7 +234,7 @@ def update_dataset_from_dataframes__iter( if sort_partitions_by: # Define function which sorts each partition by column sort_partitions_by_fn = partial( - sort_values_categorical, column=sort_partitions_by + sort_values_categorical, columns=sort_partitions_by ) new_partitions = [] diff --git a/kartothek/io/testing/build_cube.py b/kartothek/io/testing/build_cube.py index f4abb576..171e7127 100644 --- a/kartothek/io/testing/build_cube.py +++ b/kartothek/io/testing/build_cube.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- import numpy as np import pandas as pd import pandas.testing as pdt @@ -17,6 +16,7 @@ from kartothek.core.dataset import DatasetMetadata from kartothek.core.index import ExplicitSecondaryIndex, PartitionIndex from kartothek.io.eager import store_dataframes_as_dataset +from kartothek.io_components.cube.write import MultiTableCommitAborted from kartothek.io_components.metapartition import SINGLE_TABLE __all__ = ( @@ -127,9 +127,7 @@ def test_simple_two_datasets(driver, function_store): assert isinstance(ds_source.indices["p"], PartitionIndex) assert isinstance(ds_source.indices["x"], ExplicitSecondaryIndex) - assert set(ds_enrich.indices.keys()) == { - "p", - } + assert set(ds_enrich.indices.keys()) == {"p"} assert isinstance(ds_enrich.indices["p"], PartitionIndex) assert set(ds_source.table_meta) == {SINGLE_TABLE} @@ -236,7 +234,7 @@ def test_parquet(driver, function_store): .rename(columns={"föö".encode("utf8"): "föö"}) ) - pdt.assert_frame_equal(df_actual, df_expected) + pdt.assert_frame_equal(df_actual.reset_index(drop=True), df_expected) def test_fail_sparse(driver, driver_name, function_store): @@ -546,10 +544,12 @@ def test_empty_df(driver, function_store, empty_first): ) # DS metadata, "x" index, common metadata, 1 partition -def test_fail_duplicates_local(driver, function_store): +def test_fail_duplicates_local(driver, driver_name, function_store): """ Might happen during DB queries. """ + if driver_name == "dask_dataframe": + pytest.xfail(reason="Cannot guarantee duplicates for DDF") df = pd.DataFrame( { "x": [0, 0], @@ -650,13 +650,16 @@ def test_fail_wrong_types(driver, function_store): uuid_prefix="cube", seed_dataset="source", ) - with pytest.raises(ValueError) as exc: + with pytest.raises(MultiTableCommitAborted) as exc_info: driver( data={"source": df_source, "enrich": df_enrich}, cube=cube, store=function_store, ) - assert 'Found incompatible entries for column "x"' in str(exc.value) + + cause = exc_info.value.__cause__ + assert isinstance(cause, ValueError) + assert 'Found incompatible entries for column "x"' in str(cause) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("source"), function_store()) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("enrich"), function_store()) @@ -711,13 +714,15 @@ def test_fail_nondistinc_payload(driver, function_store): uuid_prefix="cube", seed_dataset="source", ) - with pytest.raises(ValueError) as exc: + with pytest.raises(MultiTableCommitAborted) as exc_info: driver( data={"source": df_source, "enrich": df_enrich}, cube=cube, store=function_store, ) - assert "Found columns present in multiple datasets" in str(exc.value) + cause = exc_info.value.__cause__ + assert isinstance(cause, ValueError) + assert "Found columns present in multiple datasets" in str(cause) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("source"), function_store()) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("enrich"), function_store()) @@ -805,10 +810,12 @@ def test_fail_partial_build(driver, function_store): assert set(function_store().keys()) == keys -def test_fails_projected_duplicates(driver, function_store): +def test_fails_projected_duplicates(driver, driver_name, function_store): """ Test if duplicate check also works w/ projected data. (was a regression) """ + if driver_name == "dask_dataframe": + pytest.xfail(reason="Cannot guarantee duplicates for DDF") df_source = pd.DataFrame( { "x": [0, 1, 0, 1], @@ -897,9 +904,8 @@ def test_fails_null_dimension(driver, function_store): cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") with pytest.raises(ValueError) as exc: driver(data=df, cube=cube, store=function_store) - assert 'Found NULL-values in dimension column "x" of dataset "seed"' in str( - exc.value - ) + + assert 'Found NULL-values in dimension column "x" of dataset "seed"' in str(exc) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("seed"), function_store()) @@ -939,11 +945,11 @@ def test_fails_null_index(driver, function_store): ) with pytest.raises(ValueError) as exc: driver(data=df, cube=cube, store=function_store) - assert 'Found NULL-values in index column "i1" of dataset "seed"' in str(exc.value) + assert 'Found NULL-values in index column "i1"' in str(exc.value) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("seed"), function_store()) -def test_fail_all_empty(driver, function_store): +def test_fail_all_empty(driver, driver_name, function_store): """ Might happen due to DB-based filters. """ @@ -951,9 +957,12 @@ def test_fail_all_empty(driver, function_store): {"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v": [10, 11, 12, 13]} ).loc[[]] cube = Cube(dimension_columns=["x"], partition_columns=["p"], uuid_prefix="cube") - with pytest.raises(ValueError) as exc: + + with pytest.raises(MultiTableCommitAborted) as exc_info: driver(data=df, cube=cube, store=function_store) - assert "Cannot write empty datasets: seed" in str(exc.value) + exc = exc_info.value.__cause__ + assert isinstance(exc, ValueError) + assert "Cannot write empty datasets" in str(exc) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("source"), function_store()) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("enrich"), function_store()) @@ -1004,14 +1013,16 @@ def test_overwrite_rollback_ktk_cube(driver, function_store): df_enrich2 = pd.DataFrame( {"x": [10, 11], "p": [10, 10], "v1": [20, 21], "i4": [20, 21]} ) - with pytest.raises(ValueError) as exc: + with pytest.raises(MultiTableCommitAborted) as exc_info: driver( data={"source": df_source2, "enrich": df_enrich2}, cube=cube, store=function_store, overwrite=True, ) - assert str(exc.value).startswith("Found columns present in multiple datasets:") + cause = exc_info.value.__cause__ + assert isinstance(cause, ValueError) + assert str(cause).startswith("Found columns present in multiple datasets:") ds_source = DatasetMetadata.load_from_store( uuid=cube.ktk_dataset_uuid("source"), store=function_store() @@ -1090,14 +1101,15 @@ def test_overwrite_rollback_ktk(driver, function_store): "i4": [20, 21], } ) - with pytest.raises(ValueError) as exc: + with pytest.raises(MultiTableCommitAborted) as exc_info: driver( data={"source": df_source2, "enrich": df_enrich2}, cube=cube, store=function_store, overwrite=True, ) - assert str(exc.value).startswith("Found columns present in multiple datasets:") + cause = exc_info.value.__cause__ + assert str(cause).startswith("Found columns present in multiple datasets:") ds_source = DatasetMetadata.load_from_store( uuid=cube.ktk_dataset_uuid(cube.seed_dataset), store=function_store() @@ -1211,10 +1223,7 @@ def test_fail_partition_on_1(driver, function_store): dimension_columns=["x"], partition_columns=["p", "q"], uuid_prefix="cube" ) - with pytest.raises( - ValueError, - match="Seed dataset seed must have the following, fixed partition-on attribute: p, q", - ): + with pytest.raises(ValueError) as exc_info: driver( data=df, cube=cube, @@ -1222,6 +1231,13 @@ def test_fail_partition_on_1(driver, function_store): partition_on={cube.seed_dataset: ["x", "p"]}, ) + cause = exc_info.value # .__cause__ + assert isinstance(cause, ValueError) + assert ( + "Seed dataset seed must have the following, fixed partition-on attribute: p, q" + in str(cause) + ) + assert set(function_store().keys()) == set() @@ -1417,13 +1433,15 @@ def test_fail_partition_on_nondistinc_payload(driver, function_store): uuid_prefix="cube", seed_dataset="source", ) - with pytest.raises(ValueError) as exc: + with pytest.raises(MultiTableCommitAborted) as exc_info: driver( data={"source": df_source, "enrich": df_enrich}, cube=cube, store=function_store, partition_on={"enrich": ["v1"]}, ) - assert "Found columns present in multiple datasets" in str(exc.value) + cause = exc_info.value.__cause__ + assert isinstance(cause, ValueError) + assert "Found columns present in multiple datasets" in str(cause) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("source"), function_store()) assert not DatasetMetadata.exists(cube.ktk_dataset_uuid("enrich"), function_store()) diff --git a/kartothek/io/testing/extend_cube.py b/kartothek/io/testing/extend_cube.py index 1bdaccd0..76530be4 100644 --- a/kartothek/io/testing/extend_cube.py +++ b/kartothek/io/testing/extend_cube.py @@ -5,6 +5,7 @@ from kartothek.core.dataset import DatasetMetadata from kartothek.core.index import ExplicitSecondaryIndex, PartitionIndex from kartothek.io.eager_cube import build_cube +from kartothek.io_components.cube.write import MultiTableCommitAborted from kartothek.io_components.metapartition import SINGLE_TABLE __all__ = ( @@ -90,9 +91,11 @@ def test_fails_incompatible_dtypes(driver, function_store, existing_cube): "i3": [100, 101, 102, 103], } ) - with pytest.raises(ValueError) as exc: + with pytest.raises(MultiTableCommitAborted) as exc_info: driver(data={"extra": df}, cube=existing_cube, store=function_store) - assert 'Found incompatible entries for column "x"' in str(exc.value) + cause = exc_info.value.__cause__ + assert isinstance(cause, ValueError) + assert 'Found incompatible entries for column "x"' in str(cause) assert not DatasetMetadata.exists( existing_cube.ktk_dataset_uuid("extra"), function_store() ) @@ -277,9 +280,11 @@ def test_fail_all_empty(driver, function_store, existing_cube): {"x": [0, 1, 2, 3], "p": [0, 0, 1, 1], "v": [10, 11, 12, 13]} ).loc[[]] - with pytest.raises(ValueError) as exc: + with pytest.raises(MultiTableCommitAborted) as exc_info: driver(data={"extra": df}, cube=existing_cube, store=function_store) - assert "Cannot write empty datasets: extra" in str(exc.value) + exc = exc_info.value.__cause__ + assert isinstance(exc, ValueError) + assert "Cannot write empty datasets: extra" in str(exc) assert not DatasetMetadata.exists( existing_cube.ktk_dataset_uuid("extra"), function_store() ) diff --git a/kartothek/io/testing/update.py b/kartothek/io/testing/update.py index 25a94d8f..a18b0e8d 100644 --- a/kartothek/io/testing/update.py +++ b/kartothek/io/testing/update.py @@ -673,7 +673,7 @@ def test_partition_on_null(store_factory, bound_update_dataset): # gh-262 ) with pytest.raises( - Exception, match=r"Original dataframe size .* on a column with null values.", + Exception, match=r"Original dataframe size .* on a column with null values." ): bound_update_dataset( [{"data": {"table": df}}], @@ -681,3 +681,66 @@ def test_partition_on_null(store_factory, bound_update_dataset): # gh-262 dataset_uuid="a_unique_dataset_identifier", partition_on=["part"], ) + + +def test_update_infers_partition_on(store_factory, bound_update_dataset, df_not_nested): + dataset_uuid = "dataset_uuid" + + dataset = bound_update_dataset( + [{"data": {"table": df_not_nested}}], + dataset_uuid=dataset_uuid, + store=store_factory, + partition_on=df_not_nested.columns[0], + ) + # update the dataset + # do not use partition_on since it should be interfered from the existing dataset + + updated_dataset = bound_update_dataset( + [{"data": {"table": df_not_nested}}], + dataset_uuid=dataset_uuid, + store=store_factory, + ) + + assert len(updated_dataset.partitions) == 2 * len(dataset.partitions) + + +def test_update_raises_incompatible_partition_keys( + store_factory, bound_update_dataset, df_not_nested +): + dataset_uuid = "dataset_uuid" + bound_update_dataset( + [{"data": {"table": df_not_nested}}], + dataset_uuid=dataset_uuid, + store=store_factory, + partition_on=df_not_nested.columns[0], + ) + # Not allowed to use different partition_on + with pytest.raises( + ValueError, match="Incompatible set of partition keys encountered." + ): + bound_update_dataset( + [{"data": {"table": df_not_nested}}], + dataset_uuid=dataset_uuid, + store=store_factory, + partition_on=df_not_nested.columns[1], + ) + + +def test_update_raises_incompatible_inidces( + store_factory, bound_update_dataset, df_not_nested +): + dataset_uuid = "dataset_uuid" + bound_update_dataset( + [{"data": {"table": df_not_nested}}], + dataset_uuid=dataset_uuid, + store=store_factory, + secondary_indices=df_not_nested.columns[0], + ) + # Not allowed to update with indices which do not yet exist in dataset + with pytest.raises(ValueError, match="indices"): + bound_update_dataset( + [{"data": {"table": df_not_nested}}], + dataset_uuid=dataset_uuid, + store=store_factory, + secondary_indices=df_not_nested.columns[1], + ) diff --git a/kartothek/io_components/cube/write.py b/kartothek/io_components/cube/write.py index 93508de4..5c07c13d 100644 --- a/kartothek/io_components/cube/write.py +++ b/kartothek/io_components/cube/write.py @@ -3,7 +3,9 @@ """ import itertools from copy import copy +from typing import Dict, Iterable, Optional, Sequence, Tuple +import dask.dataframe as dd import pandas as pd from pandas.api.types import is_sparse @@ -15,6 +17,7 @@ KTK_CUBE_METADATA_PARTITION_COLUMNS, KTK_CUBE_METADATA_VERSION, ) +from kartothek.core.cube.cube import Cube from kartothek.core.dataset import DatasetMetadataBuilder from kartothek.core.naming import metadata_key_from_uuid from kartothek.core.uuid import gen_uuid @@ -125,7 +128,37 @@ def prepare_ktk_metadata(cube, ktk_cube_dataset_id, metadata): return ds_metadata -def _check_user_df(ktk_cube_dataset_id, df, cube, existing_payload, partition_on): +def assert_dimesion_index_cols_notnull( + df: pd.DataFrame, ktk_cube_dataset_id: str, cube: Cube, partition_on: Sequence[str] +) -> pd.DataFrame: + """ + Assert that index and dimesion columns are not NULL and raise an appropriate error if so. + + .. note:: + + Indices for plain non-cube dataset drop null during index build! + """ + + df_columns_set = set(df.columns) + dcols_present = set(cube.dimension_columns) & df_columns_set + icols_present = set(cube.index_columns) & df_columns_set + + for cols, what in ( + (partition_on, "partition"), + (dcols_present, "dimension"), + (icols_present, "index"), + ): + for col in sorted(cols): + if df[col].isnull().any(): + raise ValueError( + 'Found NULL-values in {what} column "{col}" of dataset "{ktk_cube_dataset_id}"'.format( + col=col, ktk_cube_dataset_id=ktk_cube_dataset_id, what=what + ) + ) + return df + + +def check_user_df(ktk_cube_dataset_id, df, cube, existing_payload, partition_on): """ Check user-provided DataFrame for sanity. @@ -149,7 +182,7 @@ def _check_user_df(ktk_cube_dataset_id, df, cube, existing_payload, partition_on """ if df is None: return - if not isinstance(df, pd.DataFrame): + if not (isinstance(df, pd.DataFrame) or isinstance(df, dd.DataFrame)): raise TypeError( 'Provided DataFrame is not a pandas.DataFrame or None, but is a "{t}"'.format( t=type(df).__name__ @@ -162,7 +195,6 @@ def _check_user_df(ktk_cube_dataset_id, df, cube, existing_payload, partition_on df_columns = list(df.columns) df_columns_set = set(df_columns) dcols_present = set(cube.dimension_columns) & df_columns_set - icols_present = set(cube.index_columns) & df_columns_set if len(df_columns) != len(df_columns_set): raise ValueError( @@ -201,18 +233,15 @@ def _check_user_df(ktk_cube_dataset_id, df, cube, existing_payload, partition_on ) ) - for cols, what in ( - (partition_on, "partition"), - (dcols_present, "dimension"), - (icols_present, "index"), - ): - for col in sorted(cols): - if df[col].isnull().any(): - raise ValueError( - 'Found NULL-values in {what} column "{col}" of dataset "{ktk_cube_dataset_id}"'.format( - col=col, ktk_cube_dataset_id=ktk_cube_dataset_id, what=what - ) - ) + # Factor this check out. All others can be performed on the dask.DataFrame. + # This one can only be executed on a pandas DataFame + if isinstance(df, pd.DataFrame): + assert_dimesion_index_cols_notnull( + ktk_cube_dataset_id=ktk_cube_dataset_id, + df=df, + cube=cube, + partition_on=partition_on, + ) payload = get_payload_subset(df.columns, cube) payload_overlap = payload & existing_payload @@ -291,7 +320,7 @@ def prepare_data_for_ktk( ValueError In case anything is fishy. """ - _check_user_df(ktk_cube_dataset_id, df, cube, existing_payload, partition_on) + check_user_df(ktk_cube_dataset_id, df, cube, existing_payload, partition_on) if (df is None) or df.empty: # fast-path for empty DF @@ -372,6 +401,10 @@ def multiplex_user_input(data, cube): return data +class MultiTableCommitAborted(RuntimeError): + """An Error occured during the commit of a MultiTable dataset (Cube) causing a rollback.""" + + def apply_postwrite_checks(datasets, cube, store, existing_datasets): """ Apply sanity checks that can only be done after Kartothek has written its datasets. @@ -401,8 +434,9 @@ def apply_postwrite_checks(datasets, cube, store, existing_datasets): empty_datasets = { ktk_cube_dataset_id for ktk_cube_dataset_id, ds in datasets.items() - if SINGLE_TABLE not in ds.table_meta + if SINGLE_TABLE not in ds.table_meta or len(ds.partitions) == 0 } + if empty_datasets: raise ValueError( "Cannot write empty datasets: {empty_datasets}".format( @@ -418,7 +452,9 @@ def apply_postwrite_checks(datasets, cube, store, existing_datasets): existing_datasets=existing_datasets, new_datasets=datasets, store=store ) - raise e + raise MultiTableCommitAborted( + "Post commit check failed. Operation rolled back." + ) from e return datasets @@ -521,22 +557,26 @@ def _rollback_transaction(existing_datasets, new_datasets, store): ) -def prepare_ktk_partition_on(cube, ktk_cube_dataset_ids, partition_on): +def prepare_ktk_partition_on( + cube: Cube, + ktk_cube_dataset_ids: Iterable[str], + partition_on: Optional[Dict[str, Iterable[str]]], +) -> Dict[str, Tuple[str, ...]]: """ Prepare ``partition_on`` values for kartothek. Parameters ---------- - cube: kartothek.core.cube.cube.Cube + cube: Cube specification. - ktk_cube_dataset_ids: Iterable[str] + ktk_cube_dataset_ids: ktk_cube_dataset_ids announced by the user. - partition_on: Optional[Dict[str, Iterable[str]]] + partition_on: Optional parition-on attributes for datasets. Returns ------- - partition_on: Dict[str, Tuple[str, ...]] + partition_on: Partition-on per dataset. Raises diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index e92d05b0..0edc156b 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -7,7 +7,7 @@ from collections import defaultdict, namedtuple from copy import copy from functools import wraps -from typing import Any, Dict, Iterable, Iterator, Optional, cast +from typing import Any, Dict, Iterable, Iterator, Optional, Sequence, Set, Union, cast import numpy as np import pandas as pd @@ -1216,7 +1216,7 @@ def _renormalize_meta(meta): return mp @_apply_to_list - def build_indices(self, columns): + def build_indices(self, columns: Iterable[str]): """ This builds the indices for this metapartition for the given columns. The indices for the passed columns are rebuilt, so exisiting index entries in the metapartition are overwritten. @@ -1230,9 +1230,10 @@ def build_indices(self, columns): new_indices = {} for col in columns: - possible_values = set() + possible_values: Set[str] = set() col_in_partition = False for df in self.data.values(): + if col in df: possible_values = possible_values | set(df[col].dropna().unique()) col_in_partition = True @@ -1267,7 +1268,7 @@ def build_indices(self, columns): return self.copy(indices=new_indices) @_apply_to_list - def partition_on(self, partition_on): + def partition_on(self, partition_on: Union[str, Sequence[str]]): """ Partition all dataframes assigned to this MetaPartition according the the given columns. diff --git a/kartothek/io_components/utils.py b/kartothek/io_components/utils.py index b91e9513..8beedf47 100644 --- a/kartothek/io_components/utils.py +++ b/kartothek/io_components/utils.py @@ -4,15 +4,22 @@ import collections import inspect import logging -from typing import Callable, Optional +from typing import Callable, List, Optional, TypeVar, Union, overload import decorator import pandas as pd from kartothek.core.dataset import DatasetMetadata from kartothek.core.factory import _ensure_factory +from kartothek.core.typing import StoreFactory, StoreInput from kartothek.core.utils import ensure_store, lazy_store +try: + from typing_extensions import Literal # type: ignore +except ImportError: + from typing import Literal # type: ignore + + signature = inspect.signature @@ -109,9 +116,9 @@ def _ensure_compatible_indices(dataset, secondary_indices): if secondary_indices and set(ds_secondary_indices) != set(secondary_indices): raise ValueError( - "Incorrect indices provided for dataset.\n" - "Expected: {}\n" - "But got: {}".format(ds_secondary_indices, secondary_indices) + f"Incorrect indices provided for dataset.\n" + f"Expected: {ds_secondary_indices}\n" + f"But got: {secondary_indices}" ) return ds_secondary_indices else: @@ -178,14 +185,41 @@ def validate_partition_keys( return ds_factory, ds_metadata_version, partition_on -_NORMALIZE_ARGS = [ +_NORMALIZE_ARGS_LIST = [ "partition_on", "delete_scope", "secondary_indices", + "sort_partitions_by", + "bucket_by", "dispatch_by", - "store", ] +_NORMALIZE_ARGS = _NORMALIZE_ARGS_LIST + ["store"] + +T = TypeVar("T") + + +@overload +def normalize_arg( + arg_name: Literal[ + "partition_on", + "delete_scope", + "secondary_indices", + "bucket_by", + "sort_partitions_by", + "dispatch_by", + ], + old_value: Optional[Union[T, List[T]]], +) -> List[T]: + ... + + +@overload +def normalize_arg( + arg_name: Literal["store"], old_value: Optional[StoreInput] +) -> StoreFactory: + ... + def normalize_arg(arg_name, old_value): """ @@ -217,19 +251,15 @@ def _make_list(_args): ) return list(_args) - if arg_name in ["partition_on", "delete_scope", "secondary_indices", "dispatch_by"]: + if arg_name in _NORMALIZE_ARGS_LIST: if old_value is None: return [] elif isinstance(old_value, list): return old_value else: return _make_list(old_value) - - if arg_name in ["store"]: - if old_value is None: - return None - else: - return lazy_store(old_value) + elif arg_name == "store" and old_value is not None: + return lazy_store(old_value) return old_value @@ -338,21 +368,19 @@ def align_categories(dfs, categoricals): return return_dfs -def sort_values_categorical(df, column): +def sort_values_categorical(df: pd.DataFrame, columns: Union[List[str], str]): """ Sort a dataframe lexicographically by the categories of column `column` """ - if isinstance(column, list): - if len(column) == 1: - column = column[0] - else: - raise ValueError("Can only sort after a single column") - if pd.api.types.is_categorical_dtype(df[column]): - cat_accesor = df[column].cat - df[column] = cat_accesor.reorder_categories( - sorted(cat_accesor.categories), ordered=True - ) - return df.sort_values(by=[column]).reset_index(drop=True) + if not isinstance(columns, list): + columns = [columns] + for col in columns: + if pd.api.types.is_categorical_dtype(df[col]): + cat_accesor = df[col].cat + df[col] = cat_accesor.reorder_categories( + sorted(cat_accesor.categories), ordered=True + ) + return df.sort_values(by=columns).reset_index(drop=True) def check_single_table_dataset(dataset, expected_table=None): diff --git a/kartothek/io_components/write.py b/kartothek/io_components/write.py index 889f1fc0..50dc155d 100644 --- a/kartothek/io_components/write.py +++ b/kartothek/io_components/write.py @@ -1,8 +1,8 @@ -# -*- coding: utf-8 -*- - - from collections import defaultdict -from typing import Dict, cast +from functools import partial +from typing import Any, Dict, Optional, Sequence, Union, cast + +import pandas as pd from kartothek.core import naming from kartothek.core.common_metadata import ( @@ -14,18 +14,63 @@ from kartothek.core.dataset import DatasetMetadataBuilder from kartothek.core.index import ExplicitSecondaryIndex, IndexBase, PartitionIndex from kartothek.core.partition import Partition -from kartothek.core.typing import StoreInput +from kartothek.core.typing import StoreFactory, StoreInput from kartothek.core.utils import ensure_store from kartothek.io_components.metapartition import ( SINGLE_TABLE, MetaPartition, + parse_input_to_metapartition, partition_labels_from_mps, ) -from kartothek.io_components.utils import combine_metadata, extract_duplicates +from kartothek.io_components.utils import ( + combine_metadata, + extract_duplicates, + sort_values_categorical, +) +from kartothek.serialization import DataFrameSerializer SINGLE_CATEGORY = SINGLE_TABLE +def write_partition( + partition_df: Any, # TODO: Establish typing for parse_input_to_metapartition + secondary_indices: Optional[Union[str, Sequence[str]]], + sort_partitions_by: Optional[Union[str, Sequence[str]]], + dataset_uuid: str, + partition_on: Optional[Union[str, Sequence[str]]], + store_factory: StoreFactory, + df_serializer: Optional[DataFrameSerializer], + metadata_version: int, + dataset_table_name: Optional[str] = None, +) -> MetaPartition: + """ + Write a dataframe to store, performing all necessary preprocessing tasks + like partitioning, bucketing (NotImplemented), indexing, etc. in the correct order. + """ + store = ensure_store(store_factory) + if isinstance(partition_df, pd.DataFrame) and dataset_table_name: + parse_input = [{"data": {dataset_table_name: partition_df}}] + else: + parse_input = partition_df + # delete reference to enable release after partition_on; before index build + del partition_df + # I don't have access to the group values + mps = parse_input_to_metapartition( + parse_input, + metadata_version=metadata_version, + expected_secondary_indices=secondary_indices, + ) + if sort_partitions_by: + mps = mps.apply(partial(sort_values_categorical, columns=sort_partitions_by)) + if partition_on: + mps = mps.partition_on(partition_on) + if secondary_indices: + mps = mps.build_indices(secondary_indices) + return mps.store_dataframes( + store=store, dataset_uuid=dataset_uuid, df_serializer=df_serializer + ) + + def persist_indices( store: StoreInput, dataset_uuid: str, indices: Dict[str, IndexBase] ) -> Dict[str, str]: diff --git a/tests/core/test_docs.py b/tests/core/test_docs.py index 5dba0642..81c22a5d 100644 --- a/tests/core/test_docs.py +++ b/tests/core/test_docs.py @@ -10,7 +10,11 @@ read_dataset_as_metapartitions_bag, store_bag_as_dataset, ) -from kartothek.io.dask.dataframe import read_dataset_as_ddf, update_dataset_from_ddf +from kartothek.io.dask.dataframe import ( + read_dataset_as_ddf, + store_dataset_from_ddf, + update_dataset_from_ddf, +) from kartothek.io.dask.delayed import ( delete_dataset__delayed, merge_datasets_as_delayed, @@ -49,6 +53,7 @@ store_bag_as_dataset, build_dataset_indices__bag, read_dataset_as_ddf, + store_dataset_from_ddf, update_dataset_from_ddf, delete_dataset__delayed, merge_datasets_as_delayed, diff --git a/tests/io/dask/dataframe/test_shuffle.py b/tests/io/dask/dataframe/test_shuffle.py new file mode 100644 index 00000000..2b281722 --- /dev/null +++ b/tests/io/dask/dataframe/test_shuffle.py @@ -0,0 +1,205 @@ +import pickle + +import dask.dataframe as dd +import numpy as np +import pandas as pd +import pandas.testing as pdt +import pytest + +from kartothek.core.factory import DatasetFactory +from kartothek.io.dask._shuffle import _KTK_HASH_BUCKET, _hash_bucket +from kartothek.io.dask.dataframe import store_dataset_from_ddf, update_dataset_from_ddf +from kartothek.io.iter import read_dataset_as_dataframes__iterator + + +@pytest.mark.parametrize("col", ["range", "range_duplicated", "random"]) +def test_hash_bucket(col, num_buckets=5): + df = pd.DataFrame( + { + "range": np.arange(10), + "range_duplicated": np.repeat(np.arange(2), 5), + "random": np.random.randint(0, 100, 10), + } + ) + hashed = _hash_bucket(df, [col], num_buckets) + assert (hashed.groupby(col).agg({_KTK_HASH_BUCKET: "nunique"}) == 1).all().all() + + # Check that hashing is consistent for small dataframe sizes (where df.col.nunique() < num_buckets) + df_sample = df.iloc[[0, 7]] + hashed_sample = _hash_bucket(df_sample, [col], num_buckets) + expected = hashed.loc[df_sample.index] + pdt.assert_frame_equal(expected, hashed_sample) + + +def test_hashing_determinism(): + """Make sure that the hashing algorithm used by pandas is independent of any context variables""" + df = pd.DataFrame({"range": np.arange(10)}) + hashed = _hash_bucket(df, ["range"], 5) + expected = pd.DataFrame( + { + "range": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], + _KTK_HASH_BUCKET: np.uint8([0, 0, 1, 2, 0, 3, 2, 0, 1, 4]), + } + ) + pdt.assert_frame_equal(hashed, expected) + + +@pytest.mark.parametrize("bucket_by", [None, "range"]) +def test_update_shuffle_no_partition_on(store_factory, bucket_by): + df = pd.DataFrame( + { + "range": np.arange(10), + "range_duplicated": np.repeat(np.arange(2), 5), + "random": np.random.randint(0, 100, 10), + } + ) + ddf = dd.from_pandas(df, npartitions=10) + + with pytest.raises( + ValueError, match="``num_buckets`` must not be None when shuffling data." + ): + update_dataset_from_ddf( + ddf, + store_factory, + dataset_uuid="output_dataset_uuid", + table="table", + shuffle=True, + num_buckets=None, + bucket_by=bucket_by, + ).compute() + + res_default = update_dataset_from_ddf( + ddf, + store_factory, + dataset_uuid="output_dataset_uuid_default", + table="table", + shuffle=True, + bucket_by=bucket_by, + ).compute() + assert len(res_default.partitions) == 1 + + res = update_dataset_from_ddf( + ddf, + store_factory, + dataset_uuid="output_dataset_uuid", + table="table", + shuffle=True, + num_buckets=2, + bucket_by=bucket_by, + ).compute() + + assert len(res.partitions) == 2 + + +@pytest.mark.parametrize("unique_primaries", [1, 4]) +@pytest.mark.parametrize("unique_secondaries", [1, 3]) +@pytest.mark.parametrize("num_buckets", [1, 5]) +@pytest.mark.parametrize("repartition", [1, 2]) +@pytest.mark.parametrize("npartitions", [5, 10]) +@pytest.mark.parametrize("bucket_by", [None, "sorted_column"]) +@pytest.mark.parametrize("func", [update_dataset_from_ddf, store_dataset_from_ddf]) +def test_update_shuffle_buckets( + store_factory, + unique_primaries, + unique_secondaries, + num_buckets, + repartition, + npartitions, + bucket_by, + func, +): + """ + Assert that certain properties are always given for the output dataset + no matter how the input data distribution looks like + + Properties to assert: + * All partitions have a unique value for its correspondent primary key + * number of partitions is at least one per unique partition value, at + most ``num_buckets`` per primary partition value. + * If we demand a column to be sorted it is per partition monotonic + """ + + primaries = np.arange(unique_primaries) + secondary = np.arange(unique_secondaries) + num_rows = 100 + primaries = np.repeat(primaries, np.ceil(num_rows / unique_primaries))[:num_rows] + secondary = np.repeat(secondary, np.ceil(num_rows / unique_secondaries))[:num_rows] + # ensure that there is an unsorted column uncorrelated + # to the primary and secondary columns which can be sorted later on per partition + unsorted_column = np.repeat(np.arange(100 / 10), 10) + np.random.shuffle(unsorted_column) + np.random.shuffle(primaries) + np.random.shuffle(secondary) + + df = pd.DataFrame( + {"primary": primaries, "secondary": secondary, "sorted_column": unsorted_column} + ) + secondary_indices = ["secondary"] + expected_num_indices = 2 # One primary + + # used for tests later on to + if bucket_by: + secondary_indices.append(bucket_by) + expected_num_indices = 3 + + # shuffle all rows. properties of result should be reproducible + df = df.sample(frac=1).reset_index(drop=True) + ddf = dd.from_pandas(df, npartitions=npartitions) + + dataset_comp = func( + ddf, + store_factory, + dataset_uuid="output_dataset_uuid", + table="core", + secondary_indices=secondary_indices, + shuffle=True, + bucket_by=bucket_by, + repartition_ratio=repartition, + num_buckets=num_buckets, + sort_partitions_by="sorted_column", + partition_on=["primary"], + ) + + s = pickle.dumps(dataset_comp, pickle.HIGHEST_PROTOCOL) + dataset_comp = pickle.loads(s) + + dataset = dataset_comp.compute() + dataset = dataset.load_all_indices(store_factory()) + + assert len(dataset.partitions) <= num_buckets * unique_primaries + assert len(dataset.partitions) >= unique_primaries + + assert len(dataset.indices) == expected_num_indices + + assert set(dataset.indices["primary"].index_dct.keys()) == set( + range(unique_primaries) + ) + assert ( + list(map(lambda x: len(x), dataset.indices["primary"].index_dct.values())) + <= [num_buckets] * unique_primaries + ) + + assert set(dataset.indices["secondary"].index_dct.keys()) == set( + range(unique_secondaries) + ) + + assert set(dataset.table_meta["core"].names) == { + "primary", + "secondary", + "sorted_column", + } + + factory = DatasetFactory("output_dataset_uuid", store_factory) + factory.load_all_indices() + + if bucket_by: + ind_df = factory.get_indices_as_dataframe(["primary", bucket_by]) + + assert not ind_df.duplicated().any() + + for data_dct in read_dataset_as_dataframes__iterator( + dataset_uuid=dataset.uuid, store=store_factory + ): + df = data_dct["core"] + assert len(df.primary.unique()) == 1 + assert df.sorted_column.is_monotonic diff --git a/tests/io/dask/dataframe/test_update.py b/tests/io/dask/dataframe/test_update.py index abf54aab..474db25f 100644 --- a/tests/io/dask/dataframe/test_update.py +++ b/tests/io/dask/dataframe/test_update.py @@ -1,54 +1,13 @@ -# -*- coding: utf-8 -*- -# pylint: disable=E1101 - import pickle import dask import dask.dataframe as dd -import numpy as np -import pandas as pd -import pandas.testing as pdt import pytest -from kartothek.core.factory import DatasetFactory -from kartothek.io.dask._update import _KTK_HASH_BUCKET, _hash_bucket from kartothek.io.dask.dataframe import update_dataset_from_ddf -from kartothek.io.iter import read_dataset_as_dataframes__iterator from kartothek.io.testing.update import * # noqa -@pytest.mark.parametrize("col", ["range", "range_duplicated", "random"]) -def test_hash_bucket(col, num_buckets=5): - df = pd.DataFrame( - { - "range": np.arange(10), - "range_duplicated": np.repeat(np.arange(2), 5), - "random": np.random.randint(0, 100, 10), - } - ) - hashed = _hash_bucket(df, [col], num_buckets) - assert (hashed.groupby(col).agg({_KTK_HASH_BUCKET: "nunique"}) == 1).all().all() - - # Check that hashing is consistent for small dataframe sizes (where df.col.nunique() < num_buckets) - df_sample = df.iloc[[0, 7]] - hashed_sample = _hash_bucket(df_sample, [col], num_buckets) - expected = hashed.loc[df_sample.index] - pdt.assert_frame_equal(expected, hashed_sample) - - -def test_hashing_determinism(): - """Make sure that the hashing algorithm used by pandas is independent of any context variables""" - df = pd.DataFrame({"range": np.arange(10)}) - hashed = _hash_bucket(df, ["range"], 5) - expected = pd.DataFrame( - { - "range": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], - _KTK_HASH_BUCKET: np.uint8([0, 0, 1, 2, 0, 3, 2, 0, 1, 4]), - } - ) - pdt.assert_frame_equal(hashed, expected) - - @pytest.fixture def bound_update_dataset(): return _update_dataset @@ -81,234 +40,14 @@ def _return_none(): return None -@pytest.mark.parametrize("bucket_by", [None, "range"]) -def test_update_shuffle_no_partition_on(store_factory, bucket_by): - df = pd.DataFrame( - { - "range": np.arange(10), - "range_duplicated": np.repeat(np.arange(2), 5), - "random": np.random.randint(0, 100, 10), - } - ) - ddf = dd.from_pandas(df, npartitions=10) - - with pytest.raises( - ValueError, match="``num_buckets`` must not be None when shuffling data." - ): - update_dataset_from_ddf( - ddf, - store_factory, - dataset_uuid="output_dataset_uuid", - table="table", - shuffle=True, - num_buckets=None, - bucket_by=bucket_by, - ).compute() - - res_default = update_dataset_from_ddf( - ddf, - store_factory, - dataset_uuid="output_dataset_uuid_default", - table="table", - shuffle=True, - bucket_by=bucket_by, - ).compute() - assert len(res_default.partitions) == 1 - - res = update_dataset_from_ddf( - ddf, - store_factory, - dataset_uuid="output_dataset_uuid", - table="table", - shuffle=True, - num_buckets=2, - bucket_by=bucket_by, - ).compute() - - assert len(res.partitions) == 2 - - -@pytest.mark.parametrize("unique_primaries", [1, 4]) -@pytest.mark.parametrize("unique_secondaries", [1, 3]) -@pytest.mark.parametrize("num_buckets", [1, 5]) -@pytest.mark.parametrize("repartition", [1, 2]) -@pytest.mark.parametrize("npartitions", [5, 10]) -@pytest.mark.parametrize("bucket_by", [None, "sorted_column"]) -def test_update_shuffle_buckets( - store_factory, - metadata_version, - unique_primaries, - unique_secondaries, - num_buckets, - repartition, - npartitions, - bucket_by, -): - """ - Assert that certain properties are always given for the output dataset - no matter how the input data distribution looks like - - Properties to assert: - * All partitions have a unique value for its correspondent primary key - * number of partitions is at least one per unique partition value, at - most ``num_buckets`` per primary partition value. - * If we demand a column to be sorted it is per partition monotonic - """ - - primaries = np.arange(unique_primaries) - secondary = np.arange(unique_secondaries) - num_rows = 100 - primaries = np.repeat(primaries, np.ceil(num_rows / unique_primaries))[:num_rows] - secondary = np.repeat(secondary, np.ceil(num_rows / unique_secondaries))[:num_rows] - # ensure that there is an unsorted column uncorrelated - # to the primary and secondary columns which can be sorted later on per partition - unsorted_column = np.repeat(np.arange(100 / 10), 10) - np.random.shuffle(unsorted_column) - np.random.shuffle(primaries) - np.random.shuffle(secondary) - - df = pd.DataFrame( - {"primary": primaries, "secondary": secondary, "sorted_column": unsorted_column} - ) - secondary_indices = ["secondary"] - expected_num_indices = 2 # One primary - - # used for tests later on to - if bucket_by: - secondary_indices.append(bucket_by) - expected_num_indices = 3 - - # shuffle all rows. properties of result should be reproducible - df = df.sample(frac=1).reset_index(drop=True) - ddf = dd.from_pandas(df, npartitions=npartitions) - - dataset_comp = update_dataset_from_ddf( - ddf, - store_factory, - dataset_uuid="output_dataset_uuid", - table="core", - secondary_indices=secondary_indices, - shuffle=True, - bucket_by=bucket_by, - repartition_ratio=repartition, - num_buckets=num_buckets, - sort_partitions_by="sorted_column", - default_metadata_version=metadata_version, - partition_on=["primary"], - ) - - s = pickle.dumps(dataset_comp, pickle.HIGHEST_PROTOCOL) - dataset_comp = pickle.loads(s) - - dataset = dataset_comp.compute() - dataset = dataset.load_all_indices(store_factory()) - - assert len(dataset.partitions) <= num_buckets * unique_primaries - assert len(dataset.partitions) >= unique_primaries - - assert len(dataset.indices) == expected_num_indices - - assert set(dataset.indices["primary"].index_dct.keys()) == set( - range(unique_primaries) - ) - assert ( - list(map(lambda x: len(x), dataset.indices["primary"].index_dct.values())) - <= [num_buckets] * unique_primaries - ) - - assert set(dataset.indices["secondary"].index_dct.keys()) == set( - range(unique_secondaries) - ) - - assert set(dataset.table_meta["core"].names) == { - "primary", - "secondary", - "sorted_column", - } - - factory = DatasetFactory("output_dataset_uuid", store_factory) - factory.load_all_indices() - - if bucket_by: - ind_df = factory.get_indices_as_dataframe(["primary", bucket_by]) - - assert not ind_df.duplicated().any() - - for data_dct in read_dataset_as_dataframes__iterator( - dataset_uuid=dataset.uuid, store=store_factory - ): - df = data_dct["core"] - assert len(df.primary.unique()) == 1 - assert df.sorted_column.is_monotonic - - # update the dataset - # do not use partition_on since it should be interfered from the existing dataset - tasks = update_dataset_from_ddf( - ddf, - store_factory, - dataset_uuid="output_dataset_uuid", - table="core", - shuffle=True, - repartition_ratio=repartition, - num_buckets=num_buckets, - sort_partitions_by="sorted_column", - default_metadata_version=metadata_version, - bucket_by=bucket_by, - ) - - s = pickle.dumps(tasks, pickle.HIGHEST_PROTOCOL) - tasks = pickle.loads(s) - - updated_dataset = tasks.compute() - - assert len(updated_dataset.partitions) == 2 * len(dataset.partitions) - - # Not allowed to use different partition_on - with pytest.raises( - ValueError, match="Incompatible set of partition keys encountered." - ): - update_dataset_from_ddf( - ddf, - store_factory, - dataset_uuid="output_dataset_uuid", - table="core", - shuffle=True, - repartition_ratio=repartition, - partition_on=["sorted_column"], - num_buckets=num_buckets, - sort_partitions_by="sorted_column", - default_metadata_version=metadata_version, - ) - - # Not allowed to update with indices which do not yet exist in dataset - with pytest.raises(ValueError, match="indices"): - update_dataset_from_ddf( - ddf, - store_factory, - dataset_uuid="output_dataset_uuid", - table="core", - shuffle=True, - partition_on=["primary"], - repartition_ratio=repartition, - secondary_indices=["sorted_column"], - num_buckets=num_buckets, - sort_partitions_by="sorted_column", - default_metadata_version=metadata_version, - ) - +def test_delayed_as_delete_scope(store_factory, df_all_types): # Check that delayed objects are allowed as delete scope. tasks = update_dataset_from_ddf( - None, + dd.from_pandas(df_all_types, npartitions=1), store_factory, dataset_uuid="output_dataset_uuid", table="core", - shuffle=True, - repartition_ratio=repartition, - num_buckets=num_buckets, - sort_partitions_by="sorted_column", - default_metadata_version=metadata_version, delete_scope=dask.delayed(_return_none)(), - bucket_by=bucket_by, ) s = pickle.dumps(tasks, pickle.HIGHEST_PROTOCOL) diff --git a/tests/io_components/test_utils.py b/tests/io_components/test_utils.py index 5d4942dd..34865184 100644 --- a/tests/io_components/test_utils.py +++ b/tests/io_components/test_utils.py @@ -218,6 +218,34 @@ def test_sort_cateogrical(): assert all(sorted_df["cat"].cat.categories == sorted(categories)) +def test_sort_cateogrical_multiple_cols(): + df = pd.DataFrame.from_records( + [ + {"ColA": "B", "ColB": "Z", "Payload": 1}, + {"ColA": "B", "ColB": "A", "Payload": 2}, + {"ColA": "A", "ColB": "A", "Payload": 3}, + {"ColA": "C", "ColB": "Z", "Payload": 4}, + ] + ) + df_expected = df.copy().sort_values(by=["ColA", "ColB"]).reset_index(drop=True) + df = df.astype({col: "category" for col in ["ColA", "ColB"]}) + # Correct order + # {"ColA": "A", "ColB": "A", "Payload": 3}, + # {"ColA": "B", "ColB": "A", "Payload": 2}, + # {"ColA": "B", "ColB": "Z", "Payload": 1}, + # {"ColA": "C", "ColB": "Z", "Payload": 4}, + df_expected = df_expected.astype( + { + "ColA": pd.CategoricalDtype(categories=["A", "B", "C"], ordered=True), + "ColB": pd.CategoricalDtype(categories=["A", "Z"], ordered=True), + } + ) + + sorted_df = sort_values_categorical(df, ["ColA", "ColB"]) + + pdt.assert_frame_equal(sorted_df, df_expected) + + def test_sort_categorical_pyarrow_conversion(): """ Make sure sorting does not introduce indices that end up in the Arrow table.