Skip to content

Commit

Permalink
Merge pull request #147 from Kshitij68/Fix_bug#47_creating_dataset_wi…
Browse files Browse the repository at this point in the history
…th_non_existing_column_does_not_raise

Creating dataset with non existing column to raise an error
  • Loading branch information
fjetter authored Oct 21, 2019
2 parents 9a5c040 + 5b84de8 commit 988e05c
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Version 3.X.X (2019-09-XX)
* `~kartothek.serialization.PredicatesType`
* `~kartothek.serialization.ConjunctionType`
* `~kartothek.serialization.LiteralType`
- Creating dataset with non existing columns as explicit index to raise a ValueError

Internal changes
^^^^^^^^^^^^^^^^
Expand Down
8 changes: 2 additions & 6 deletions kartothek/io/testing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,12 @@ def create_dataset(dataset_uuid, store_factory, metadata_version):
{
"label": "cluster_1",
"data": [(SINGLE_TABLE, df.copy(deep=True)), ("helper", df_helper)],
"indices": {
SINGLE_TABLE: {val: ["cluster_2"] for val in df.TARGET.unique()}
},
"indices": {"P": {val: ["cluster_2"] for val in df.TARGET.unique()}},
},
{
"label": "cluster_2",
"data": [(SINGLE_TABLE, df.copy(deep=True)), ("helper", df_helper)],
"indices": {
SINGLE_TABLE: {val: ["cluster_2"] for val in df.TARGET.unique()}
},
"indices": {"P": {val: ["cluster_2"] for val in df.TARGET.unique()}},
},
]

Expand Down
39 changes: 10 additions & 29 deletions kartothek/io_components/metapartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@
from kartothek.core.urlencode import decode_key, quote_indices
from kartothek.core.utils import ensure_string_type, verify_metadata_version
from kartothek.core.uuid import gen_uuid
from kartothek.io_components.utils import _instantiate_store, combine_metadata
from kartothek.io_components.utils import (
_ensure_valid_indices,
_instantiate_store,
combine_metadata,
)
from kartothek.serialization import (
DataFrameSerializer,
default_serializer,
Expand Down Expand Up @@ -1575,9 +1579,9 @@ def parse_input_to_metapartition(
# A partition with explicit label, no metadata, one table and index information
input_obj = {
'label': 'partition_label',
'data': [('table', pd.DataFrame())],
'data': [('table', pd.DataFrame([{'column_1':values_1, 'column_2':values_2}]))],
'indices': {
"column": {
"column_1": {
value: ['partition_label']
}
}
Expand Down Expand Up @@ -1674,11 +1678,9 @@ def parse_input_to_metapartition(
'Use the `secondary_indices` keyword argument of "write" and "update" functions instead.',
DeprecationWarning,
)
if expected_secondary_indices not in (False, None):
# Validate explicit input of indices
_ensure_valid_indices(
secondary_indices=expected_secondary_indices, mp_indices=indices
)
_ensure_valid_indices(
mp_indices=indices, secondary_indices=expected_secondary_indices, data=data
)

mp = MetaPartition(
# TODO: Deterministic hash for the input?
Expand All @@ -1699,24 +1701,3 @@ def parse_input_to_metapartition(
raise ValueError("Unexpected type: {}".format(type(obj)))

return mp


def _ensure_valid_indices(secondary_indices, mp_indices):
if not secondary_indices:
if mp_indices:
raise ValueError(
"Incorrect indices provided for dataset.\n"
f"Expected index columns: {secondary_indices}"
f"Provided index: {mp_indices}"
)
else:
secondary_indices = set(secondary_indices)
# If the dataset has `secondary_indices` defined, then these indices will be build later so there is no need to
# ensure that they are also defined here (on a partition level).
# Hence, we just check that no new indices are defined on the partition level.
if not secondary_indices.issuperset(mp_indices.keys()):
raise ValueError(
"Incorrect indices provided for dataset.\n"
f"Expected index columns: {secondary_indices}"
f"Provided index: {mp_indices}"
)
22 changes: 22 additions & 0 deletions kartothek/io_components/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,28 @@ def _ensure_compatible_indices(dataset, secondary_indices):
return secondary_indices or False


def _ensure_valid_indices(mp_indices, secondary_indices=None, data=None):
# TODO (Kshitij68): Behavior is closely matches `_ensure_compatible_indices`. Refactoring can prove to be helpful
if data:
for table_name in data:
for index in mp_indices.keys():
if index not in data[table_name].columns:
raise ValueError(
f"In table {table_name}, no column corresponding to index {index}"
)
if secondary_indices not in (False, None):
secondary_indices = set(secondary_indices)
# If the dataset has `secondary_indices` defined, then these indices will be build later so there is no need to
# ensure that they are also defined here (on a partition level).
# Hence, we just check that no new indices are defined on the partition level.
if not secondary_indices.issuperset(mp_indices.keys()):
raise ValueError(
"Incorrect indices provided for dataset.\n"
f"Expected index columns: {secondary_indices}"
f"Provided index: {mp_indices}"
)


def validate_partition_keys(
dataset_uuid,
store,
Expand Down
21 changes: 21 additions & 0 deletions tests/io/eager/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from kartothek.core.common_metadata import make_meta, read_schema_metadata
from kartothek.core.dataset import DatasetMetadata
from kartothek.core.index import ExplicitSecondaryIndex
from kartothek.core.uuid import gen_uuid
from kartothek.io.eager import (
create_empty_dataset_header,
Expand Down Expand Up @@ -289,3 +290,23 @@ def test_write_single_partition_different_partitioning(store_factory):
store=store_factory, dataset_uuid=dataset.uuid, data=new_data
)
assert initial_keys + 2 == len(list(store_factory().keys()))


def test_store_dataframes_as_dataset_does_not_allow_invalid_indices(store_factory):
partitions = [
{
"label": "part1",
"data": [("core", pd.DataFrame({"p": [1, 2]}))],
"indices": {"x": ExplicitSecondaryIndex("x", {1: ["part1"], 2: ["part2"]})},
}
]

with pytest.raises(
ValueError, match="In table core, no column corresponding to index x"
):
store_dataframes_as_dataset(
dfs=partitions,
store=store_factory,
metadata={"dataset": "metadata"},
dataset_uuid="dataset_uuid",
)

0 comments on commit 988e05c

Please sign in to comment.