Skip to content

Commit

Permalink
Refactor dispatch logic
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Oct 18, 2019
1 parent 43765fa commit 6e0803c
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 183 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ Version 3.X.X (2019-09-XX)
messages will be less verbose in these cases as before.
- Add support for pyarrow 0.15.0
- Remove support for pyarrow < 0.13.0
- Fix an issue where the `dispatch_by` keyword would disable partition pruning
- Additional functions in `kartothek.serialization` module for dealing with predicates
* :func:`~kartothek.serialization.check_predicates`
* :func:`~kartothek.serialization.filter_predicates_by_column`
* :func:`~kartothek.serialization.columns_in_predicates`
- Added available types for type annotation when dealing with predicates
* `~kartothek.serialization.PredicatesType`
* `~kartothek.serialization.ConjunctionType`
* `~kartothek.serialization.LiteralType`


Version 3.4.0 (2019-09-17)
Expand Down
64 changes: 51 additions & 13 deletions kartothek/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
from kartothek.core.partition import Partition
from kartothek.core.urlencode import decode_key, quote_indices
from kartothek.core.utils import verify_metadata_version
from kartothek.serialization import PredicatesType, columns_in_predicates

_logger = logging.getLogger(__name__)

TableMetaType = Dict[str, SchemaWrapper]


def _validate_uuid(uuid: str) -> bool:
return re.match(r"[a-zA-Z0-9+\-_]+$", uuid) is not None
Expand Down Expand Up @@ -340,14 +343,18 @@ def load_partition_indices(self) -> "DatasetMetadataBase":
partitions=self.partitions,
table_meta=self.table_meta,
default_dtype=pa.string() if self.metadata_version == 3 else None,
partition_keys=self.partition_keys,
)
combined_indices = self.indices.copy()
combined_indices.update(indices)
return self.copy(indices=combined_indices)

def get_indices_as_dataframe(
self, columns: Optional[List[str]] = None, date_as_object: bool = True
) -> pd.DataFrame:
self,
columns: Optional[List[str]] = None,
date_as_object: bool = True,
predicates: PredicatesType = None,
):
"""
Converts the dataset indices to a pandas dataframe.
Expand All @@ -371,10 +378,19 @@ def get_indices_as_dataframe(
"""
if columns is None:
columns = sorted(self.indices.keys())
elif columns == []:
return pd.DataFrame(index=self.partitions)

result = None
dfs = []
for col in columns:
columns_to_scan = columns[:]
if predicates:
predicate_columns = columns_in_predicates(predicates)
# Don't use set logic to preserve order
for col in predicate_columns:
if col not in columns_to_scan and col in self.indices:
columns_to_scan.append(col)

for col in columns_to_scan:
if col not in self.indices:
if col in self.partition_keys:
raise RuntimeError(
Expand All @@ -383,19 +399,30 @@ def get_indices_as_dataframe(
raise ValueError("Index `{}` unknown.")
df = pd.DataFrame(
self.indices[col].as_flat_series(
partitions_as_index=True, date_as_object=date_as_object
partitions_as_index=True,
date_as_object=date_as_object,
predicates=predicates,
)
)
dfs.append(df)

# start joining with the small ones
for df in sorted(dfs, key=lambda df: len(df)):
if result is None:
result = df
continue
sorted_dfs = sorted(dfs, key=lambda df: len(df))
result = sorted_dfs.pop(0)
for df in sorted_dfs:
result = result.merge(df, left_index=True, right_index=True, copy=False)

return result
if predicates:
index_name = result.index.name
result = (
result.loc[:, columns]
.reset_index()
.drop_duplicates()
.set_index(index_name)
)
return result
else:
return result


class DatasetMetadata(DatasetMetadataBase):
Expand Down Expand Up @@ -618,13 +645,24 @@ def _get_type_from_meta(
)


def _empty_partition_indices(
partition_keys: List[str], table_meta: TableMetaType, default_dtype: pa.DataType
):
indices = {}
for col in partition_keys:
arrow_type = _get_type_from_meta(table_meta, col, default_dtype)
indices[col] = PartitionIndex(column=col, index_dct={}, dtype=arrow_type)
return indices


def _construct_dynamic_index_from_partitions(
partitions: Dict[str, Partition],
table_meta: Dict[str, SchemaWrapper],
table_meta: TableMetaType,
default_dtype: pa.DataType,
partition_keys: List[str],
) -> Dict[str, PartitionIndex]:
if len(partitions) == 0:
return {}
return _empty_partition_indices(partition_keys, table_meta, default_dtype)

def _get_files(part):
if isinstance(part, dict):
Expand All @@ -638,7 +676,7 @@ def _get_files(part):
) # partitions is NOT empty here, see check above
first_partition_files = _get_files(first_partition)
if not first_partition_files:
return {}
return _empty_partition_indices(partition_keys, table_meta, default_dtype)
key_table = next(iter(first_partition_files.keys()))
storage_keys = (
(key, _get_files(part)[key_table]) for key, part in partitions.items()
Expand Down
48 changes: 41 additions & 7 deletions kartothek/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from kartothek.core._mixins import CopyMixin
from kartothek.core.common_metadata import normalize_type
from kartothek.core.urlencode import quote
from kartothek.serialization import filter_array_like
from kartothek.serialization import (
PredicatesType,
check_predicates,
filter_array_like,
filter_df_from_predicates,
filter_predicates_by_column,
)
from kartothek.serialization._parquet import _fix_pyarrow_07992_table

ValueType = TypeVar("ValueType")
Expand Down Expand Up @@ -422,24 +428,48 @@ def __ne__(self, other) -> bool:
return not (self == other)

def as_flat_series(
self, compact=False, partitions_as_index=False, date_as_object=True
self,
compact: bool = False,
partitions_as_index: bool = False,
date_as_object: bool = False,
predicates: PredicatesType = None,
):
"""
Convert the Index object to a pandas.Series
Parameters
----------
compact: bool, optional
compact:
If True, the index will be unique and the Series.values will be a list of partitions/values
partitions_as_index: bool, optional
partitions_as_index:
If True, the relation between index values and partitions will be reverted for the output
date_as_object: bool, optional
Cast dates to objects.
predicates:
A list of predicates. If a literal within the provided predicates
references a column which is not part of this index, this literal is
interpreted as True.
"""
check_predicates(predicates)
table = _index_dct_to_table(
self.index_dct, column=self.column, dtype=self.dtype
)
df = table.to_pandas(date_as_object=date_as_object)

if predicates is not None:
# If there is a conjunction without any reference to the index
# column the entire predicates expression is evaluated to True. In
# this case we do not need to filter the dataframe anymore
for conjunction in predicates:
new_conjunction = filter_predicates_by_column(
[conjunction], [self.column]
)
if new_conjunction is None:
break
else:
filtered_predicates = filter_predicates_by_column(
predicates, [self.column]
)
df = filter_df_from_predicates(df, predicates=filtered_predicates)

result_column = _PARTITION_COLUMN_NAME
# This is the way the dictionary is directly translated
# value: [partition]
Expand All @@ -451,9 +481,13 @@ def as_flat_series(
# value: part_2
# value2: part_1
if partitions_as_index or not compact:
keys = np.concatenate(df[_PARTITION_COLUMN_NAME].values)
if len(df) == 0:
keys = np.array([], dtype=df[_PARTITION_COLUMN_NAME].values.dtype)
else:
keys = np.concatenate(df[_PARTITION_COLUMN_NAME].values)

lengths = df[_PARTITION_COLUMN_NAME].apply(len).values
lengths = lengths.astype(int)
values_index = np.repeat(np.arange(len(df)), lengths)
values = df[self.column].values[values_index]

Expand Down
106 changes: 70 additions & 36 deletions kartothek/io/testing/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,40 +372,22 @@ def test_read_dataset_as_dataframes_concat_primary(

@pytest.mark.parametrize("dispatch_by", ["A", "B", "C"])
def test_read_dataset_as_dataframes_dispatch_by_single_col(
store_factory,
store_session_factory,
dataset_dispatch_by,
bound_load_dataframes,
backend_identifier,
dispatch_by,
output_type,
metadata_version,
dataset_dispatch_by_uuid,
):
if output_type == "table":
pytest.skip()
cluster1 = pd.DataFrame(
{"A": [1, 1], "B": [10, 10], "C": [1, 2], "Content": ["cluster1", "cluster1"]}
)
cluster2 = pd.DataFrame(
{"A": [1, 1], "B": [10, 10], "C": [2, 3], "Content": ["cluster2", "cluster2"]}
)
cluster3 = pd.DataFrame({"A": [1], "B": [20], "C": [1], "Content": ["cluster3"]})
cluster4 = pd.DataFrame(
{"A": [2, 2], "B": [10, 10], "C": [1, 2], "Content": ["cluster4", "cluster4"]}
)
clusters = [cluster1, cluster2, cluster3, cluster4]
partitions = [{"data": [("data", c)]} for c in clusters]

store_dataframes_as_dataset__iter(
df_generator=partitions,
store=store_factory,
dataset_uuid="partitioned_uuid",
metadata_version=metadata_version,
partition_on=["A", "B"],
secondary_indices=["C"],
)

# Dispatch by primary index "A"
dispatched_a = bound_load_dataframes(
dataset_uuid="partitioned_uuid", store=store_factory, dispatch_by=[dispatch_by]
dataset_uuid=dataset_dispatch_by_uuid,
store=store_session_factory,
dispatch_by=[dispatch_by],
)

unique_a = set()
Expand All @@ -420,15 +402,17 @@ def test_read_dataset_as_dataframes_dispatch_by_single_col(
unique_a.add(unique_dispatch[0])


def test_read_dataset_as_dataframes_dispatch_by_multi_col(
store_factory,
bound_load_dataframes,
backend_identifier,
output_type,
metadata_version,
@pytest.fixture(scope="session")
def dataset_dispatch_by_uuid():
import uuid

return uuid.uuid1().hex


@pytest.fixture(scope="session")
def dataset_dispatch_by(
metadata_version, store_session_factory, dataset_dispatch_by_uuid
):
if output_type == "table":
pytest.skip()
cluster1 = pd.DataFrame(
{"A": [1, 1], "B": [10, 10], "C": [1, 2], "Content": ["cluster1", "cluster1"]}
)
Expand All @@ -440,20 +424,33 @@ def test_read_dataset_as_dataframes_dispatch_by_multi_col(
{"A": [2, 2], "B": [10, 10], "C": [1, 2], "Content": ["cluster4", "cluster4"]}
)
clusters = [cluster1, cluster2, cluster3, cluster4]

partitions = [{"data": [("data", c)]} for c in clusters]

store_dataframes_as_dataset__iter(
df_generator=partitions,
store=store_factory,
dataset_uuid="partitioned_uuid",
store=store_session_factory,
dataset_uuid=dataset_dispatch_by_uuid,
metadata_version=metadata_version,
partition_on=["A", "B"],
secondary_indices=["C"],
)
return pd.concat(clusters).sort_values(["A", "B", "C"]).reset_index(drop=True)


def test_read_dataset_as_dataframes_dispatch_by_multi_col(
store_session_factory,
bound_load_dataframes,
output_type,
dataset_dispatch_by,
dataset_dispatch_by_uuid,
):
if output_type == "table":
pytest.skip()
for dispatch_by in permutations(("A", "B", "C"), 2):
dispatched = bound_load_dataframes(
dataset_uuid="partitioned_uuid",
store=store_factory,
dataset_uuid=dataset_dispatch_by_uuid,
store=store_session_factory,
dispatch_by=dispatch_by,
)
uniques = pd.DataFrame(columns=dispatch_by)
Expand All @@ -469,6 +466,43 @@ def test_read_dataset_as_dataframes_dispatch_by_multi_col(
assert not any(uniques.duplicated())


@pytest.mark.parametrize(
"dispatch_by, predicates, expected_dispatches",
[
# This should only dispatch one partition since there is only
# one file with valid data points
(["A"], [[("C", ">", 2)]], 1),
# We dispatch and restrict to one valie, i.e. one dispatch
(["B"], [[("B", "==", 10)]], 1),
# The same is true for a non-partition index col
(["C"], [[("C", "==", 1)]], 1),
# A condition where both primary and secondary indices need to work together
(["A", "C"], [[("A", ">", 1), ("C", "<", 3)]], 2),
],
)
def test_read_dispatch_by_with_predicates(
store_session_factory,
dataset_dispatch_by_uuid,
bound_load_dataframes,
dataset_dispatch_by,
dispatch_by,
output_type,
expected_dispatches,
predicates,
):
if output_type == "table":
pytest.skip()

dispatched = bound_load_dataframes(
dataset_uuid=dataset_dispatch_by_uuid,
store=store_session_factory,
dispatch_by=dispatch_by,
predicates=predicates,
)

assert len(dispatched) == expected_dispatches, dispatched


def test_read_dataset_as_dataframes(
dataset,
store_session_factory,
Expand Down
Loading

0 comments on commit 6e0803c

Please sign in to comment.