diff --git a/CHANGES.rst b/CHANGES.rst index e4448868..d08b7dc6 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,9 @@ Version 3.19.0 (2021-02-XY) state after the update * Expose compression type and row group chunk size in Cube interface via optional parameter of type :class:`~kartothek.serialization.ParquetSerializer`. +* Add retries to :func:`~kartothek.serialization._parquet.ParquetSerializer.restore_dataframe` + IOErrors on long running ktk + dask tasks have been observed. Until the root cause is fixed, + the serialization is retried to gain more stability. Version 3.18.0 (2021-01-25) =========================== diff --git a/kartothek/serialization/_io_buffer.py b/kartothek/serialization/_io_buffer.py index cd6563d4..faa492e5 100644 --- a/kartothek/serialization/_io_buffer.py +++ b/kartothek/serialization/_io_buffer.py @@ -10,6 +10,14 @@ _logger = logging.getLogger(__name__) +class BufferReadError(IOError): + """ + Internal kartothek error while attempting to read from buffer + """ + + pass + + class BlockBuffer(io.BufferedIOBase): """ Block-based buffer. @@ -111,7 +119,7 @@ def _fetch_blocks(self, block, n): f"Expected raw read to return {size} bytes, but instead got {len(data)}" ) _logger.error(err) - raise AssertionError(err) + raise BufferReadError(err) # fill blocks for i in range(n): @@ -135,7 +143,7 @@ def _ensure_range_loaded(self, start, size): if size < 0: msg = f"Expected size >= 0, but got start={start}, size={size}" _logger.error(msg) - raise AssertionError(msg) + raise BufferReadError(msg) block = start // self._blocksize offset = start % self._blocksize diff --git a/kartothek/serialization/_parquet.py b/kartothek/serialization/_parquet.py index 9404c7a4..351bb469 100644 --- a/kartothek/serialization/_parquet.py +++ b/kartothek/serialization/_parquet.py @@ -6,6 +6,8 @@ import datetime +import logging +import time from typing import Iterable, Optional import numpy as np @@ -33,8 +35,12 @@ except ImportError: HAVE_BOTO = False +_logger = logging.getLogger(__name__) + EPOCH_ORDINAL = datetime.date(1970, 1, 1).toordinal() +MAX_NB_RETRIES = 6 # longest retry backoff = BACKOFF_TIME * 2**(MAX_NB_RETRIES - 2) +BACKOFF_TIME = 0.01 # 10 ms def _empty_table_from_schema(parquet_file): @@ -65,6 +71,14 @@ def _reset_dictionary_columns(table, exclude=None): return table +class ParquetReadError(IOError): + """ + Internal kartothek error while attempting to read Parquet file + """ + + pass + + class ParquetSerializer(DataFrameSerializer): _PARQUET_VERSION = "2.0" type_stable = True @@ -98,7 +112,7 @@ def __repr__(self): ) @staticmethod - def restore_dataframe( + def _restore_dataframe( store: KeyValueStore, key: str, filter_query: Optional[str] = None, @@ -107,7 +121,7 @@ def restore_dataframe( categories: Optional[Iterable[str]] = None, predicates: Optional[PredicatesType] = None, date_as_object: bool = False, - ): + ) -> pd.DataFrame: check_predicates(predicates) # If we want to do columnar access we can benefit from partial reads # otherwise full read en block is the better option. @@ -187,6 +201,58 @@ def restore_dataframe( else: return df + @classmethod + def restore_dataframe( + cls, + store: KeyValueStore, + key: str, + filter_query: Optional[str] = None, + columns: Optional[Iterable[str]] = None, + predicate_pushdown_to_io: bool = True, + categories: Optional[Iterable[str]] = None, + predicates: Optional[PredicatesType] = None, + date_as_object: bool = False, + ) -> pd.DataFrame: + # https://github.com/JDASoftwareGroup/kartothek/issues/407 We have been seeing weird `IOError`s while reading + # Parquet files from Azure Blob Store. These errors have caused long running computations to fail. + # The workaround is to retry the serialization here and gain more stability for long running tasks. + # This code should not live forever, it should be removed once the underlying cause has been resolved. + for nb_retry in range(MAX_NB_RETRIES): + try: + return cls._restore_dataframe( + store=store, + key=key, + filter_query=filter_query, + columns=columns, + predicate_pushdown_to_io=predicate_pushdown_to_io, + categories=categories, + predicates=predicates, + date_as_object=date_as_object, + ) + # We only retry OSErrors (note that IOError inherits from OSError), as these kind of errors may benefit + # from retries. + except OSError as err: + raised_error = err + _logger.warning( + msg=( + f"Failed to restore dataframe, attempt {nb_retry + 1} of {MAX_NB_RETRIES} with parameters " + f"key: {key}, filter_query: {filter_query}, columns: {columns}, " + f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " + f"predicates: {predicates}, date_as_object: {date_as_object}." + ), + exc_info=True, + ) + # we don't sleep when we're done with the last attempt + if nb_retry < (MAX_NB_RETRIES - 1): + time.sleep(BACKOFF_TIME * 2 ** nb_retry) + + raise ParquetReadError( + f"Failed to restore dataframe after {MAX_NB_RETRIES} attempts. Parameters: " + f"key: {key}, filter_query: {filter_query}, columns: {columns}, " + f"predicate_pushdown_to_io: {predicate_pushdown_to_io}, categories: {categories}, " + f"date_as_object: {date_as_object}, predicates: {predicates}." + ) from raised_error + def store(self, store, key_prefix, df): key = "{}.parquet".format(key_prefix) if isinstance(df, pa.Table): diff --git a/tests/serialization/test_parquet.py b/tests/serialization/test_parquet.py index 9869595b..cad38c8d 100644 --- a/tests/serialization/test_parquet.py +++ b/tests/serialization/test_parquet.py @@ -11,6 +11,8 @@ from kartothek.serialization import DataFrameSerializer, ParquetSerializer from kartothek.serialization._parquet import ( + MAX_NB_RETRIES, + ParquetReadError, _predicate_accepts, _reset_dictionary_columns, ) @@ -459,3 +461,87 @@ def test_reset_dict_cols(store): only_a_reset = _reset_dictionary_columns(table, exclude=["colB"]).schema assert not pa.types.is_dictionary(only_a_reset.field("col").type) assert pa.types.is_dictionary(only_a_reset.field("colB").type) + + +def test_retry_on_IOError(monkeypatch, caplog, store): + """ + See https://github.com/JDASoftwareGroup/kartothek/issues/407 : + We are testing a retry-workaround for the above issue here. Once the issue is resolved, + this test and the workaround can be removed. + """ + + df = pd.DataFrame({"A": [0, 1, 2, 3]}) + + retry_count = 0 + + def patched__restore_dataframe(**kwargs): + nonlocal retry_count + retry_count += 1 + + if not retry_count > 1: + # fail for the first try + raise IOError() + elif retry_count > 1: + # simulate a successful retry + return df + + monkeypatch.setattr( + ParquetSerializer, "_restore_dataframe", patched__restore_dataframe + ) + serializer = ParquetSerializer() + key = serializer.store(store, "key", df) + df_result = serializer.restore_dataframe(store=store, key=key) + pdt.assert_frame_equal(df, df_result) + + +def test_retries_on_IOError_logs(monkeypatch, caplog, store): + """ + See https://github.com/JDASoftwareGroup/kartothek/issues/407 : + We are testing a retry-workaround for the above issue here. Once the issue is resolved, + this test and the workaround can be removed. + """ + + def patched__restore_dataframe(**kwargs): + # This kind of exception should be captured by the retry mechanism. + raise IOError() + + df = pd.DataFrame({"A": [0, 1, 2, 3]}) + monkeypatch.setattr( + ParquetSerializer, "_restore_dataframe", patched__restore_dataframe + ) + serializer = ParquetSerializer() + key = serializer.store(store, "key", df) + + with pytest.raises(ParquetReadError): + serializer.restore_dataframe(store=store, key=key) + + assert len(caplog.records) == MAX_NB_RETRIES + for log_record in caplog.records: + assert "Failed to restore dataframe" in log_record.message + + +def test_retry_fail_on_other_error(monkeypatch, caplog, store): + """ + See https://github.com/JDASoftwareGroup/kartothek/issues/407 : + We are testing a retry-workaround for the above issue here. Once the issue is resolved, + this test and the workaround can be removed. + + We only want to retry on OSErrors (and inherited exceptions) -- all other exceptions should be raised. + """ + + df = pd.DataFrame({"A": [0, 1, 2, 3]}) + + def patched__restore_dataframe(**kwargs): + # This should not be retried but raised immediately. + raise ValueError() + + monkeypatch.setattr( + ParquetSerializer, "_restore_dataframe", patched__restore_dataframe + ) + serializer = ParquetSerializer() + key = serializer.store(store, "key", df) + + with pytest.raises(ValueError): + serializer.restore_dataframe(store=store, key=key) + + assert len(caplog.records) == 0