Skip to content

Commit

Permalink
Merge pull request #157 from fjetter/refactor_dispatch
Browse files Browse the repository at this point in the history
Refactor dispatch logic
  • Loading branch information
fjetter authored Oct 21, 2019
2 parents f5e82a6 + 7a2bbb5 commit 9a5c040
Show file tree
Hide file tree
Showing 17 changed files with 381 additions and 195 deletions.
13 changes: 13 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ Version 3.X.X (2019-09-XX)
- Fix an issue where an empty dataframe of a partition in a multi-table dataset
would raise a schema validation exception
- Remove support for pyarrow < 0.13.0
- Fix an issue where the `dispatch_by` keyword would disable partition pruning
- Additional functions in `kartothek.serialization` module for dealing with predicates
* :func:`~kartothek.serialization.check_predicates`
* :func:`~kartothek.serialization.filter_predicates_by_column`
* :func:`~kartothek.serialization.columns_in_predicates`
- Added available types for type annotation when dealing with predicates
* `~kartothek.serialization.PredicatesType`
* `~kartothek.serialization.ConjunctionType`
* `~kartothek.serialization.LiteralType`

Internal changes
^^^^^^^^^^^^^^^^
- Move the docs module from `io_components` to `core`


Version 3.4.0 (2019-09-17)
Expand Down
71 changes: 53 additions & 18 deletions kartothek/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from kartothek.core._compat import load_json
from kartothek.core._mixins import CopyMixin
from kartothek.core.common_metadata import SchemaWrapper, read_schema_metadata
from kartothek.core.docs import default_docs
from kartothek.core.index import (
ExplicitSecondaryIndex,
IndexBase,
Expand All @@ -25,9 +26,12 @@
from kartothek.core.partition import Partition
from kartothek.core.urlencode import decode_key, quote_indices
from kartothek.core.utils import verify_metadata_version
from kartothek.serialization import PredicatesType, columns_in_predicates

_logger = logging.getLogger(__name__)

TableMetaType = Dict[str, SchemaWrapper]


def _validate_uuid(uuid: str) -> bool:
return re.match(r"[a-zA-Z0-9+\-_]+$", uuid) is not None
Expand Down Expand Up @@ -340,14 +344,19 @@ def load_partition_indices(self) -> "DatasetMetadataBase":
partitions=self.partitions,
table_meta=self.table_meta,
default_dtype=pa.string() if self.metadata_version == 3 else None,
partition_keys=self.partition_keys,
)
combined_indices = self.indices.copy()
combined_indices.update(indices)
return self.copy(indices=combined_indices)

@default_docs
def get_indices_as_dataframe(
self, columns: Optional[List[str]] = None, date_as_object: bool = True
) -> pd.DataFrame:
self,
columns: Optional[List[str]] = None,
date_as_object: bool = True,
predicates: PredicatesType = None,
):
"""
Converts the dataset indices to a pandas dataframe.
Expand All @@ -363,18 +372,22 @@ def get_indices_as_dataframe(
Parameters
----------
columns: list of str
If provided, the dataframe will only be constructed for the provided columns/indices.
If `None` is given, all indices are included.
date_as_object: bool, optional
Cast dates to objects.
"""
if columns is None:
columns = sorted(self.indices.keys())
elif columns == []:
return pd.DataFrame(index=self.partitions)

result = None
dfs = []
for col in columns:
columns_to_scan = columns[:]
if predicates:
predicate_columns = columns_in_predicates(predicates)
# Don't use set logic to preserve order
for col in predicate_columns:
if col not in columns_to_scan and col in self.indices:
columns_to_scan.append(col)

for col in columns_to_scan:
if col not in self.indices:
if col in self.partition_keys:
raise RuntimeError(
Expand All @@ -383,19 +396,30 @@ def get_indices_as_dataframe(
raise ValueError("Index `{}` unknown.")
df = pd.DataFrame(
self.indices[col].as_flat_series(
partitions_as_index=True, date_as_object=date_as_object
partitions_as_index=True,
date_as_object=date_as_object,
predicates=predicates,
)
)
dfs.append(df)

# start joining with the small ones
for df in sorted(dfs, key=lambda df: len(df)):
if result is None:
result = df
continue
sorted_dfs = sorted(dfs, key=lambda df: len(df))
result = sorted_dfs.pop(0)
for df in sorted_dfs:
result = result.merge(df, left_index=True, right_index=True, copy=False)

return result
if predicates:
index_name = result.index.name
result = (
result.loc[:, columns]
.reset_index()
.drop_duplicates()
.set_index(index_name)
)
return result
else:
return result


class DatasetMetadata(DatasetMetadataBase):
Expand Down Expand Up @@ -618,13 +642,24 @@ def _get_type_from_meta(
)


def _empty_partition_indices(
partition_keys: List[str], table_meta: TableMetaType, default_dtype: pa.DataType
):
indices = {}
for col in partition_keys:
arrow_type = _get_type_from_meta(table_meta, col, default_dtype)
indices[col] = PartitionIndex(column=col, index_dct={}, dtype=arrow_type)
return indices


def _construct_dynamic_index_from_partitions(
partitions: Dict[str, Partition],
table_meta: Dict[str, SchemaWrapper],
table_meta: TableMetaType,
default_dtype: pa.DataType,
partition_keys: List[str],
) -> Dict[str, PartitionIndex]:
if len(partitions) == 0:
return {}
return _empty_partition_indices(partition_keys, table_meta, default_dtype)

def _get_files(part):
if isinstance(part, dict):
Expand All @@ -638,7 +673,7 @@ def _get_files(part):
) # partitions is NOT empty here, see check above
first_partition_files = _get_files(first_partition)
if not first_partition_files:
return {}
return _empty_partition_indices(partition_keys, table_meta, default_dtype)
key_table = next(iter(first_partition_files.keys()))
storage_keys = (
(key, _get_files(part)[key_table]) for key, part in partitions.items()
Expand Down
File renamed without changes.
50 changes: 43 additions & 7 deletions kartothek/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@
from kartothek.core._compat import ARROW_LARGER_EQ_0150
from kartothek.core._mixins import CopyMixin
from kartothek.core.common_metadata import normalize_type
from kartothek.core.docs import default_docs
from kartothek.core.urlencode import quote
from kartothek.serialization import filter_array_like
from kartothek.serialization import (
PredicatesType,
check_predicates,
filter_array_like,
filter_df_from_predicates,
filter_predicates_by_column,
)
from kartothek.serialization._parquet import _fix_pyarrow_07992_table

ValueType = TypeVar("ValueType")
Expand Down Expand Up @@ -421,25 +428,50 @@ def __eq__(self, other) -> bool:
def __ne__(self, other) -> bool:
return not (self == other)

@default_docs
def as_flat_series(
self, compact=False, partitions_as_index=False, date_as_object=True
self,
compact: bool = False,
partitions_as_index: bool = False,
date_as_object: bool = False,
predicates: PredicatesType = None,
):
"""
Convert the Index object to a pandas.Series
Parameters
----------
compact: bool, optional
compact:
If True, the index will be unique and the Series.values will be a list of partitions/values
partitions_as_index: bool, optional
partitions_as_index:
If True, the relation between index values and partitions will be reverted for the output
date_as_object: bool, optional
Cast dates to objects.
predicates:
A list of predicates. If a literal within the provided predicates
references a column which is not part of this index, this literal is
interpreted as True.
"""
check_predicates(predicates)
table = _index_dct_to_table(
self.index_dct, column=self.column, dtype=self.dtype
)
df = table.to_pandas(date_as_object=date_as_object)

if predicates is not None:
# If there is a conjunction without any reference to the index
# column the entire predicates expression is evaluated to True. In
# this case we do not need to filter the dataframe anymore
for conjunction in predicates:
new_conjunction = filter_predicates_by_column(
[conjunction], [self.column]
)
if new_conjunction is None:
break
else:
filtered_predicates = filter_predicates_by_column(
predicates, [self.column]
)
df = filter_df_from_predicates(df, predicates=filtered_predicates)

result_column = _PARTITION_COLUMN_NAME
# This is the way the dictionary is directly translated
# value: [partition]
Expand All @@ -451,9 +483,13 @@ def as_flat_series(
# value: part_2
# value2: part_1
if partitions_as_index or not compact:
keys = np.concatenate(df[_PARTITION_COLUMN_NAME].values)
if len(df) == 0:
keys = np.array([], dtype=df[_PARTITION_COLUMN_NAME].values.dtype)
else:
keys = np.concatenate(df[_PARTITION_COLUMN_NAME].values)

lengths = df[_PARTITION_COLUMN_NAME].apply(len).values
lengths = lengths.astype(int)
values_index = np.repeat(np.arange(len(df)), lengths)
values = df[self.column].values[values_index]

Expand Down
2 changes: 1 addition & 1 deletion kartothek/io/dask/bag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import dask.bag as db

from kartothek.core import naming
from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.utils import _check_callable
from kartothek.core.uuid import gen_uuid
Expand All @@ -14,7 +15,6 @@
_identity,
_maybe_get_categoricals_from_index,
)
from kartothek.io_components.docs import default_docs
from kartothek.io_components.index import update_indices_from_partitions
from kartothek.io_components.metapartition import (
MetaPartition,
Expand Down
2 changes: 1 addition & 1 deletion kartothek/io/dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import numpy as np

from kartothek.core.common_metadata import empty_dataframe_from_schema
from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.naming import DEFAULT_METADATA_VERSION
from kartothek.io_components.docs import default_docs
from kartothek.io_components.metapartition import parse_input_to_metapartition
from kartothek.io_components.update import update_dataset_from_partitions
from kartothek.io_components.utils import (
Expand Down
2 changes: 1 addition & 1 deletion kartothek/io/dask/delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dask import delayed

from kartothek.core import naming
from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.naming import DEFAULT_METADATA_VERSION
from kartothek.core.utils import _check_callable
Expand All @@ -17,7 +18,6 @@
delete_indices,
delete_top_level_metadata,
)
from kartothek.io_components.docs import default_docs
from kartothek.io_components.gc import delete_files, dispatch_files_to_gc
from kartothek.io_components.merge import align_datasets
from kartothek.io_components.metapartition import (
Expand Down
2 changes: 1 addition & 1 deletion kartothek/io/eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
store_schema_metadata,
)
from kartothek.core.dataset import DatasetMetadataBuilder
from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.naming import (
DEFAULT_METADATA_STORAGE_FORMAT,
Expand All @@ -24,7 +25,6 @@
delete_indices,
delete_top_level_metadata,
)
from kartothek.io_components.docs import default_docs
from kartothek.io_components.gc import delete_files, dispatch_files_to_gc
from kartothek.io_components.index import update_indices_from_partitions
from kartothek.io_components.metapartition import (
Expand Down
2 changes: 1 addition & 1 deletion kartothek/io/iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from functools import partial
from typing import cast

from kartothek.core.docs import default_docs
from kartothek.core.factory import _ensure_factory
from kartothek.core.naming import (
DEFAULT_METADATA_STORAGE_FORMAT,
DEFAULT_METADATA_VERSION,
)
from kartothek.core.uuid import gen_uuid
from kartothek.io_components.docs import default_docs
from kartothek.io_components.metapartition import (
MetaPartition,
parse_input_to_metapartition,
Expand Down
Loading

0 comments on commit 9a5c040

Please sign in to comment.