diff --git a/CHANGES.rst b/CHANGES.rst index 691c2133..089cea37 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -31,6 +31,7 @@ Version 3.X.X (2019-09-XX) * `~kartothek.serialization.PredicatesType` * `~kartothek.serialization.ConjunctionType` * `~kartothek.serialization.LiteralType` +- Creating dataset with non existing columns as explicit index to raise a ValueError Internal changes ^^^^^^^^^^^^^^^^ diff --git a/kartothek/io/testing/utils.py b/kartothek/io/testing/utils.py index 1fba8cdc..7a1a562e 100644 --- a/kartothek/io/testing/utils.py +++ b/kartothek/io/testing/utils.py @@ -22,16 +22,12 @@ def create_dataset(dataset_uuid, store_factory, metadata_version): { "label": "cluster_1", "data": [(SINGLE_TABLE, df.copy(deep=True)), ("helper", df_helper)], - "indices": { - SINGLE_TABLE: {val: ["cluster_2"] for val in df.TARGET.unique()} - }, + "indices": {"P": {val: ["cluster_2"] for val in df.TARGET.unique()}}, }, { "label": "cluster_2", "data": [(SINGLE_TABLE, df.copy(deep=True)), ("helper", df_helper)], - "indices": { - SINGLE_TABLE: {val: ["cluster_2"] for val in df.TARGET.unique()} - }, + "indices": {"P": {val: ["cluster_2"] for val in df.TARGET.unique()}}, }, ] diff --git a/kartothek/io_components/metapartition.py b/kartothek/io_components/metapartition.py index 52db167d..a99069cb 100644 --- a/kartothek/io_components/metapartition.py +++ b/kartothek/io_components/metapartition.py @@ -33,7 +33,11 @@ from kartothek.core.urlencode import decode_key, quote_indices from kartothek.core.utils import ensure_string_type, verify_metadata_version from kartothek.core.uuid import gen_uuid -from kartothek.io_components.utils import _instantiate_store, combine_metadata +from kartothek.io_components.utils import ( + _ensure_valid_indices, + _instantiate_store, + combine_metadata, +) from kartothek.serialization import ( DataFrameSerializer, default_serializer, @@ -1575,9 +1579,9 @@ def parse_input_to_metapartition( # A partition with explicit label, no metadata, one table and index information input_obj = { 'label': 'partition_label', - 'data': [('table', pd.DataFrame())], + 'data': [('table', pd.DataFrame([{'column_1':values_1, 'column_2':values_2}]))], 'indices': { - "column": { + "column_1": { value: ['partition_label'] } } @@ -1674,11 +1678,9 @@ def parse_input_to_metapartition( 'Use the `secondary_indices` keyword argument of "write" and "update" functions instead.', DeprecationWarning, ) - if expected_secondary_indices not in (False, None): - # Validate explicit input of indices - _ensure_valid_indices( - secondary_indices=expected_secondary_indices, mp_indices=indices - ) + _ensure_valid_indices( + mp_indices=indices, secondary_indices=expected_secondary_indices, data=data + ) mp = MetaPartition( # TODO: Deterministic hash for the input? @@ -1699,24 +1701,3 @@ def parse_input_to_metapartition( raise ValueError("Unexpected type: {}".format(type(obj))) return mp - - -def _ensure_valid_indices(secondary_indices, mp_indices): - if not secondary_indices: - if mp_indices: - raise ValueError( - "Incorrect indices provided for dataset.\n" - f"Expected index columns: {secondary_indices}" - f"Provided index: {mp_indices}" - ) - else: - secondary_indices = set(secondary_indices) - # If the dataset has `secondary_indices` defined, then these indices will be build later so there is no need to - # ensure that they are also defined here (on a partition level). - # Hence, we just check that no new indices are defined on the partition level. - if not secondary_indices.issuperset(mp_indices.keys()): - raise ValueError( - "Incorrect indices provided for dataset.\n" - f"Expected index columns: {secondary_indices}" - f"Provided index: {mp_indices}" - ) diff --git a/kartothek/io_components/utils.py b/kartothek/io_components/utils.py index 44404c13..027fd3e3 100644 --- a/kartothek/io_components/utils.py +++ b/kartothek/io_components/utils.py @@ -128,6 +128,28 @@ def _ensure_compatible_indices(dataset, secondary_indices): return secondary_indices or False +def _ensure_valid_indices(mp_indices, secondary_indices=None, data=None): + # TODO (Kshitij68): Behavior is closely matches `_ensure_compatible_indices`. Refactoring can prove to be helpful + if data: + for table_name in data: + for index in mp_indices.keys(): + if index not in data[table_name].columns: + raise ValueError( + f"In table {table_name}, no column corresponding to index {index}" + ) + if secondary_indices not in (False, None): + secondary_indices = set(secondary_indices) + # If the dataset has `secondary_indices` defined, then these indices will be build later so there is no need to + # ensure that they are also defined here (on a partition level). + # Hence, we just check that no new indices are defined on the partition level. + if not secondary_indices.issuperset(mp_indices.keys()): + raise ValueError( + "Incorrect indices provided for dataset.\n" + f"Expected index columns: {secondary_indices}" + f"Provided index: {mp_indices}" + ) + + def validate_partition_keys( dataset_uuid, store, diff --git a/tests/io/eager/test_write.py b/tests/io/eager/test_write.py index b869e5d8..36c6098d 100644 --- a/tests/io/eager/test_write.py +++ b/tests/io/eager/test_write.py @@ -7,6 +7,7 @@ from kartothek.core.common_metadata import make_meta, read_schema_metadata from kartothek.core.dataset import DatasetMetadata +from kartothek.core.index import ExplicitSecondaryIndex from kartothek.core.uuid import gen_uuid from kartothek.io.eager import ( create_empty_dataset_header, @@ -289,3 +290,23 @@ def test_write_single_partition_different_partitioning(store_factory): store=store_factory, dataset_uuid=dataset.uuid, data=new_data ) assert initial_keys + 2 == len(list(store_factory().keys())) + + +def test_store_dataframes_as_dataset_does_not_allow_invalid_indices(store_factory): + partitions = [ + { + "label": "part1", + "data": [("core", pd.DataFrame({"p": [1, 2]}))], + "indices": {"x": ExplicitSecondaryIndex("x", {1: ["part1"], 2: ["part2"]})}, + } + ] + + with pytest.raises( + ValueError, match="In table core, no column corresponding to index x" + ): + store_dataframes_as_dataset( + dfs=partitions, + store=store_factory, + metadata={"dataset": "metadata"}, + dataset_uuid="dataset_uuid", + )