Skip to content

Commit

Permalink
Add retries to restore dataframe (#408)
Browse files Browse the repository at this point in the history
* add retries for ParquetSerializer.restore_dataframe

Co-authored-by: Nefta Kanilmaz <[email protected]>

* Update kartothek/serialization/_parquet.py

* Update kartothek/serialization/_parquet.py

* Update kartothek/serialization/_parquet.py

* make _restore_dataframe a staticmethod

* raise error from previous error

* add comments, make AssertionError into IOError

* custom error types

* Add tests for the retry mechanism

* Fix docs

* Add test counting the retries

Co-authored-by: lr4d <[email protected]>
Co-authored-by: Lucas Rademaker <[email protected]>
  • Loading branch information
3 people authored Feb 11, 2021
1 parent 9c91e51 commit 5f80d74
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ Version 3.19.0 (2021-02-XY)
state after the update
* Expose compression type and row group chunk size in Cube interface via optional
parameter of type :class:`~kartothek.serialization.ParquetSerializer`.
* Add retries to :func:`~kartothek.serialization._parquet.ParquetSerializer.restore_dataframe`
IOErrors on long running ktk + dask tasks have been observed. Until the root cause is fixed,
the serialization is retried to gain more stability.

Version 3.18.0 (2021-01-25)
===========================
Expand Down
12 changes: 10 additions & 2 deletions kartothek/serialization/_io_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
_logger = logging.getLogger(__name__)


class BufferReadError(IOError):
"""
Internal kartothek error while attempting to read from buffer
"""

pass


class BlockBuffer(io.BufferedIOBase):
"""
Block-based buffer.
Expand Down Expand Up @@ -111,7 +119,7 @@ def _fetch_blocks(self, block, n):
f"Expected raw read to return {size} bytes, but instead got {len(data)}"
)
_logger.error(err)
raise AssertionError(err)
raise BufferReadError(err)

# fill blocks
for i in range(n):
Expand All @@ -135,7 +143,7 @@ def _ensure_range_loaded(self, start, size):
if size < 0:
msg = f"Expected size >= 0, but got start={start}, size={size}"
_logger.error(msg)
raise AssertionError(msg)
raise BufferReadError(msg)

block = start // self._blocksize
offset = start % self._blocksize
Expand Down
70 changes: 68 additions & 2 deletions kartothek/serialization/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@


import datetime
import logging
import time
from typing import Iterable, Optional

import numpy as np
Expand Down Expand Up @@ -33,8 +35,12 @@
except ImportError:
HAVE_BOTO = False

_logger = logging.getLogger(__name__)


EPOCH_ORDINAL = datetime.date(1970, 1, 1).toordinal()
MAX_NB_RETRIES = 6 # longest retry backoff = BACKOFF_TIME * 2**(MAX_NB_RETRIES - 2)
BACKOFF_TIME = 0.01 # 10 ms


def _empty_table_from_schema(parquet_file):
Expand Down Expand Up @@ -65,6 +71,14 @@ def _reset_dictionary_columns(table, exclude=None):
return table


class ParquetReadError(IOError):
"""
Internal kartothek error while attempting to read Parquet file
"""

pass


class ParquetSerializer(DataFrameSerializer):
_PARQUET_VERSION = "2.0"
type_stable = True
Expand Down Expand Up @@ -98,7 +112,7 @@ def __repr__(self):
)

@staticmethod
def restore_dataframe(
def _restore_dataframe(
store: KeyValueStore,
key: str,
filter_query: Optional[str] = None,
Expand All @@ -107,7 +121,7 @@ def restore_dataframe(
categories: Optional[Iterable[str]] = None,
predicates: Optional[PredicatesType] = None,
date_as_object: bool = False,
):
) -> pd.DataFrame:
check_predicates(predicates)
# If we want to do columnar access we can benefit from partial reads
# otherwise full read en block is the better option.
Expand Down Expand Up @@ -187,6 +201,58 @@ def restore_dataframe(
else:
return df

@classmethod
def restore_dataframe(
cls,
store: KeyValueStore,
key: str,
filter_query: Optional[str] = None,
columns: Optional[Iterable[str]] = None,
predicate_pushdown_to_io: bool = True,
categories: Optional[Iterable[str]] = None,
predicates: Optional[PredicatesType] = None,
date_as_object: bool = False,
) -> pd.DataFrame:
# https://github.com/JDASoftwareGroup/kartothek/issues/407 We have been seeing weird `IOError`s while reading
# Parquet files from Azure Blob Store. These errors have caused long running computations to fail.
# The workaround is to retry the serialization here and gain more stability for long running tasks.
# This code should not live forever, it should be removed once the underlying cause has been resolved.
for nb_retry in range(MAX_NB_RETRIES):
try:
return cls._restore_dataframe(
store=store,
key=key,
filter_query=filter_query,
columns=columns,
predicate_pushdown_to_io=predicate_pushdown_to_io,
categories=categories,
predicates=predicates,
date_as_object=date_as_object,
)
# We only retry OSErrors (note that IOError inherits from OSError), as these kind of errors may benefit
# from retries.
except OSError as err:
raised_error = err
_logger.warning(
msg=(
f"Failed to restore dataframe, attempt {nb_retry + 1} of {MAX_NB_RETRIES} with parameters "
f"key: {key}, filter_query: {filter_query}, columns: {columns}, "
f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, "
f"predicates: {predicates}, date_as_object: {date_as_object}."
),
exc_info=True,
)
# we don't sleep when we're done with the last attempt
if nb_retry < (MAX_NB_RETRIES - 1):
time.sleep(BACKOFF_TIME * 2 ** nb_retry)

raise ParquetReadError(
f"Failed to restore dataframe after {MAX_NB_RETRIES} attempts. Parameters: "
f"key: {key}, filter_query: {filter_query}, columns: {columns}, "
f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, "
f"date_as_object: {date_as_object}, predicates: {predicates}."
) from raised_error

def store(self, store, key_prefix, df):
key = "{}.parquet".format(key_prefix)
if isinstance(df, pa.Table):
Expand Down
86 changes: 86 additions & 0 deletions tests/serialization/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from kartothek.serialization import DataFrameSerializer, ParquetSerializer
from kartothek.serialization._parquet import (
MAX_NB_RETRIES,
ParquetReadError,
_predicate_accepts,
_reset_dictionary_columns,
)
Expand Down Expand Up @@ -459,3 +461,87 @@ def test_reset_dict_cols(store):
only_a_reset = _reset_dictionary_columns(table, exclude=["colB"]).schema
assert not pa.types.is_dictionary(only_a_reset.field("col").type)
assert pa.types.is_dictionary(only_a_reset.field("colB").type)


def test_retry_on_IOError(monkeypatch, caplog, store):
"""
See https://github.com/JDASoftwareGroup/kartothek/issues/407 :
We are testing a retry-workaround for the above issue here. Once the issue is resolved,
this test and the workaround can be removed.
"""

df = pd.DataFrame({"A": [0, 1, 2, 3]})

retry_count = 0

def patched__restore_dataframe(**kwargs):
nonlocal retry_count
retry_count += 1

if not retry_count > 1:
# fail for the first try
raise IOError()
elif retry_count > 1:
# simulate a successful retry
return df

monkeypatch.setattr(
ParquetSerializer, "_restore_dataframe", patched__restore_dataframe
)
serializer = ParquetSerializer()
key = serializer.store(store, "key", df)
df_result = serializer.restore_dataframe(store=store, key=key)
pdt.assert_frame_equal(df, df_result)


def test_retries_on_IOError_logs(monkeypatch, caplog, store):
"""
See https://github.com/JDASoftwareGroup/kartothek/issues/407 :
We are testing a retry-workaround for the above issue here. Once the issue is resolved,
this test and the workaround can be removed.
"""

def patched__restore_dataframe(**kwargs):
# This kind of exception should be captured by the retry mechanism.
raise IOError()

df = pd.DataFrame({"A": [0, 1, 2, 3]})
monkeypatch.setattr(
ParquetSerializer, "_restore_dataframe", patched__restore_dataframe
)
serializer = ParquetSerializer()
key = serializer.store(store, "key", df)

with pytest.raises(ParquetReadError):
serializer.restore_dataframe(store=store, key=key)

assert len(caplog.records) == MAX_NB_RETRIES
for log_record in caplog.records:
assert "Failed to restore dataframe" in log_record.message


def test_retry_fail_on_other_error(monkeypatch, caplog, store):
"""
See https://github.com/JDASoftwareGroup/kartothek/issues/407 :
We are testing a retry-workaround for the above issue here. Once the issue is resolved,
this test and the workaround can be removed.
We only want to retry on OSErrors (and inherited exceptions) -- all other exceptions should be raised.
"""

df = pd.DataFrame({"A": [0, 1, 2, 3]})

def patched__restore_dataframe(**kwargs):
# This should not be retried but raised immediately.
raise ValueError()

monkeypatch.setattr(
ParquetSerializer, "_restore_dataframe", patched__restore_dataframe
)
serializer = ParquetSerializer()
key = serializer.store(store, "key", df)

with pytest.raises(ValueError):
serializer.restore_dataframe(store=store, key=key)

assert len(caplog.records) == 0

0 comments on commit 5f80d74

Please sign in to comment.