From 8f7642f432858c1e1b1d341173ff21aff1e469b1 Mon Sep 17 00:00:00 2001 From: Baz Date: Sun, 12 Mar 2023 00:20:33 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Iterable:=20improve=20r?= =?UTF-8?q?etry=20on=20`500=20-=20Generic=20Error`=20(#23938)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-iterable/Dockerfile | 2 +- .../source_iterable/streams.py | 19 +++++++---- .../source-iterable/source_iterable/utils.py | 33 +++++++++++++++++++ .../source-iterable/unit_tests/conftest.py | 11 +++++++ .../unit_tests/test_streams.py | 17 ++++++++-- connectors.md | 2 +- docs/integrations/sources/iterable.md | 1 + 9 files changed, 75 insertions(+), 14 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7f3f4da8281f..5bc4f0c375de 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -950,7 +950,7 @@ - name: Iterable sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799 dockerRepository: airbyte/source-iterable - dockerImageTag: 0.1.25 + dockerImageTag: 0.1.26 documentationUrl: https://docs.airbyte.com/integrations/sources/iterable icon: iterable.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 5e5140406c50..b08a6d0e58e2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7003,7 +7003,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-iterable:0.1.25" +- dockerImage: "airbyte/source-iterable:0.1.26" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/iterable" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-iterable/Dockerfile b/airbyte-integrations/connectors/source-iterable/Dockerfile index dce1bed31e4a..405e29fdf2ba 100644 --- a/airbyte-integrations/connectors/source-iterable/Dockerfile +++ b/airbyte-integrations/connectors/source-iterable/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.25 +LABEL io.airbyte.version=0.1.26 LABEL io.airbyte.name=airbyte/source-iterable diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py index 8b8036645619..45ae0dee9c23 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py @@ -20,7 +20,7 @@ from requests import codes from requests.exceptions import ChunkedEncodingError from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice -from source_iterable.utils import dateutil_parse +from source_iterable.utils import IterableGenericErrorHandler, dateutil_parse EVENT_ROWS_LIMIT = 200 CAMPAIGNS_PER_REQUEST = 20 @@ -31,6 +31,8 @@ class IterableStream(HttpStream, ABC): # in case we get a 401 error (api token disabled or deleted) on a stream slice, do not make further requests within the current stream # to prevent 429 error on other streams ignore_further_slices = False + # to handle the Generic Errors (500 with msg pattern) + generic_error_handler: IterableGenericErrorHandler = IterableGenericErrorHandler() url_base = "https://api.iterable.com/api/" primary_key = "id" @@ -38,6 +40,9 @@ class IterableStream(HttpStream, ABC): def __init__(self, authenticator): self._cred = authenticator super().__init__(authenticator) + # placeholder for last slice used for API request + # to reuse it later in logs or whatever + self._last_slice: Mapping[str, Any] = {} @property def retry_factor(self) -> int: @@ -76,7 +81,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: if not self.check_unauthorized_key(response): return [] - response_json = response.json() + response_json = response.json() or {} records = response_json.get(self.data_field, []) for record in records: @@ -86,12 +91,10 @@ def should_retry(self, response: requests.Response) -> bool: # check the authentication if not self.check_unauthorized_key(response): return False - # retry on generic error 500 meaning + # retry on generic error 500 if response.status_code == 500: - if response.json().get("code") == "GenericError" and "Please try again later" in response.json().get("msg"): - self.logger.warn(f"Generic Server Error occured for stream: `{self.name}`.") - setattr(self, "raise_on_http_errors", False) - return True + # will retry for 2 times, then give up and skip the fetch for slice + return self.generic_error_handler.handle(response, self.name, self._last_slice) # all other cases return super().should_retry(response) @@ -104,6 +107,8 @@ def read_records( ) -> Iterable[Mapping[str, Any]]: if self.ignore_further_slices: return [] + # save last slice + self._last_slice = stream_slice yield from super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/utils.py b/airbyte-integrations/connectors/source-iterable/source_iterable/utils.py index 76ef3825128a..0cdfdaf25124 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/utils.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/utils.py @@ -2,8 +2,12 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import logging +from typing import Any, Mapping + import dateutil.parser import pendulum +import requests def dateutil_parse(text): @@ -22,3 +26,32 @@ def dateutil_parse(text): dt.microsecond, tz=dt.tzinfo or pendulum.tz.UTC, ) + + +class IterableGenericErrorHandler: + + logger = logging.getLogger("airbyte") + + error_count = 0 + max_retry = 2 + + def handle(self, response: requests.Response, stream_name: str, last_slice: Mapping[str, Any] = {}) -> bool: + # error pattern to check + code_pattern = "Generic Error" + msg_pattern = "Please try again later" + # prepare warning message + warning_msg = f"Generic Server Error occured for stream: `{stream_name}`. " + # For cases when there is a slice to go with, but server returns Generic Error - Please try again + # we reetry 2 times, then skipp the record and move on with warning message. + if response.json().get("code") == code_pattern and msg_pattern in response.json().get("msg"): + self.error_count += 1 + setattr(self, "raise_on_http_errors", False) + if self.error_count > self.max_retry: + self.logger.warn(warning_msg + f"Skip fetching for slice {last_slice}.") + return False + else: + self.logger.warn(warning_msg + f"Retrying for slice {last_slice}, attempt {self.error_count}") + return True + else: + # All other cases + return True diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py b/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py index f71d8c8b7c89..e3648b9fec20 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/conftest.py @@ -27,3 +27,14 @@ def config_fixture(): @pytest.fixture() def mock_lists_resp(mocker): mocker.patch("source_iterable.streams.Lists.read_records", return_value=iter([{"id": 1}, {"id": 2}])) + + +@pytest.fixture(name="lists_stream") +def lists_stream(): + # local imports + from airbyte_cdk.sources.streams.http.auth import NoAuth + from source_iterable.streams import Lists + + # return the instance of the stream so we could make global tests on it, + # to cover the different `should_retry` logic + return Lists(authenticator=NoAuth()) diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py index e48a04006366..0f937411730b 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py @@ -164,11 +164,22 @@ def test_iterable_export_stream_backoff_time(): "status, json, expected", [ (429, {}, True), - (500, {"msg": "...Please try again later...", "code": "Generic Error"}, True) + # for 500 - Generic error we should make 2 retry attempts + # and give up on third one! + (500, {"msg": "...Please try again later...1", "code": "Generic Error"}, True), + (500, {"msg": "...Please try again later...2", "code": "Generic Error"}, True), + # This one should return False + (500, {"msg": "...Please try again later...3", "code": "Generic Error"}, False) ], + ids=[ + "Retry on 429", + "Retry on 500 - Generic (first)", + "Retry on 500 - Generic (second)", + "Retry on 500 - Generic (third)", + ] ) -def test_should_retry(status, json, expected, requests_mock): - stream = Lists(authenticator=NoAuth()) +def test_should_retry(status, json, expected, requests_mock, lists_stream): + stream = lists_stream url = f"{stream.url_base}/{stream.path()}" requests_mock.get(url, json=json, status_code=status) test_response = requests.get(url) diff --git a/connectors.md b/connectors.md index dc7101d3d81a..a75a5bbba658 100644 --- a/connectors.md +++ b/connectors.md @@ -110,7 +110,7 @@ | **Instatus** | Instatus icon | Source | airbyte/source-instatus:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/instatus) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-instatus) | `1901024c-0249-45d0-bcac-31a954652927` | | **Intercom** | Intercom icon | Source | airbyte/source-intercom:0.1.31 | generally_available | [link](https://docs.airbyte.com/integrations/sources/intercom) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-intercom) | `d8313939-3782-41b0-be29-b3ca20d8dd3a` | | **Intruder** | Intruder icon | Source | airbyte/source-intruder:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/intruder) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-intruder) | `3d15163b-11d8-412f-b808-795c9b2c3a3a` | -| **Iterable** | Iterable icon | Source | airbyte/source-iterable:0.1.25 | generally_available | [link](https://docs.airbyte.com/integrations/sources/iterable) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-iterable) | `2e875208-0c0b-4ee4-9e92-1cb3156ea799` | +| **Iterable** | Iterable icon | Source | airbyte/source-iterable:0.1.26 | generally_available | [link](https://docs.airbyte.com/integrations/sources/iterable) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-iterable) | `2e875208-0c0b-4ee4-9e92-1cb3156ea799` | | **Jenkins** | Jenkins icon | Source | farosai/airbyte-jenkins-source:0.1.23 | alpha | [link](https://docs.airbyte.com/integrations/sources/jenkins) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/airbyte-jenkins-source) | `d6f73702-d7a0-4e95-9758-b0fb1af0bfba` | | **Jira** | Jira icon | Source | airbyte/source-jira:0.3.4 | beta | [link](https://docs.airbyte.com/integrations/sources/jira) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-jira) | `68e63de2-bb83-4c7e-93fa-a8a9051e3993` | | **K6 Cloud** | K6 Cloud icon | Source | airbyte/source-k6-cloud:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/k6-cloud) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-k6-cloud) | `e300ece7-b073-43a3-852e-8aff36a57f13` | diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index 547ef8e2fb01..5e60d82031be 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -76,6 +76,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------| +| 0.1.26 | 2023-03-10 | [23938](https://github.com/airbytehq/airbyte/pull/23938) | Improve retry for `500 - Generic Error` | | 0.1.25 | 2023-03-07 | [23821](https://github.com/airbytehq/airbyte/pull/23821) | Added retry for `500 - Generic Error`, increased max attempts number to `6` to handle `ChunkedEncodingError` | | 0.1.24 | 2023-02-14 | [22979](https://github.com/airbytehq/airbyte/pull/22979) | Specified date formatting in specification | | 0.1.23 | 2023-01-27 | [22011](https://github.com/airbytehq/airbyte/pull/22011) | Set `AvailabilityStrategy` for streams explicitly to `None` |