Skip to content

Commit

Permalink
🐛 Source Iterable: improve retry on 500 - Generic Error (airbytehq#…
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov authored Mar 11, 2023
1 parent 00752aa commit 8f7642f
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@
- name: Iterable
sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799
dockerRepository: airbyte/source-iterable
dockerImageTag: 0.1.25
dockerImageTag: 0.1.26
documentationUrl: https://docs.airbyte.com/integrations/sources/iterable
icon: iterable.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7003,7 +7003,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-iterable:0.1.25"
- dockerImage: "airbyte/source-iterable:0.1.26"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/iterable"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.25
LABEL io.airbyte.version=0.1.26
LABEL io.airbyte.name=airbyte/source-iterable
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from requests import codes
from requests.exceptions import ChunkedEncodingError
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice
from source_iterable.utils import dateutil_parse
from source_iterable.utils import IterableGenericErrorHandler, dateutil_parse

EVENT_ROWS_LIMIT = 200
CAMPAIGNS_PER_REQUEST = 20
Expand All @@ -31,13 +31,18 @@ class IterableStream(HttpStream, ABC):
# in case we get a 401 error (api token disabled or deleted) on a stream slice, do not make further requests within the current stream
# to prevent 429 error on other streams
ignore_further_slices = False
# to handle the Generic Errors (500 with msg pattern)
generic_error_handler: IterableGenericErrorHandler = IterableGenericErrorHandler()

url_base = "https://api.iterable.com/api/"
primary_key = "id"

def __init__(self, authenticator):
self._cred = authenticator
super().__init__(authenticator)
# placeholder for last slice used for API request
# to reuse it later in logs or whatever
self._last_slice: Mapping[str, Any] = {}

@property
def retry_factor(self) -> int:
Expand Down Expand Up @@ -76,7 +81,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return []
response_json = response.json()
response_json = response.json() or {}
records = response_json.get(self.data_field, [])

for record in records:
Expand All @@ -86,12 +91,10 @@ def should_retry(self, response: requests.Response) -> bool:
# check the authentication
if not self.check_unauthorized_key(response):
return False
# retry on generic error 500 meaning
# retry on generic error 500
if response.status_code == 500:
if response.json().get("code") == "GenericError" and "Please try again later" in response.json().get("msg"):
self.logger.warn(f"Generic Server Error occured for stream: `{self.name}`.")
setattr(self, "raise_on_http_errors", False)
return True
# will retry for 2 times, then give up and skip the fetch for slice
return self.generic_error_handler.handle(response, self.name, self._last_slice)
# all other cases
return super().should_retry(response)

Expand All @@ -104,6 +107,8 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
if self.ignore_further_slices:
return []
# save last slice
self._last_slice = stream_slice
yield from super().read_records(sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from typing import Any, Mapping

import dateutil.parser
import pendulum
import requests


def dateutil_parse(text):
Expand All @@ -22,3 +26,32 @@ def dateutil_parse(text):
dt.microsecond,
tz=dt.tzinfo or pendulum.tz.UTC,
)


class IterableGenericErrorHandler:

logger = logging.getLogger("airbyte")

error_count = 0
max_retry = 2

def handle(self, response: requests.Response, stream_name: str, last_slice: Mapping[str, Any] = {}) -> bool:
# error pattern to check
code_pattern = "Generic Error"
msg_pattern = "Please try again later"
# prepare warning message
warning_msg = f"Generic Server Error occured for stream: `{stream_name}`. "
# For cases when there is a slice to go with, but server returns Generic Error - Please try again
# we reetry 2 times, then skipp the record and move on with warning message.
if response.json().get("code") == code_pattern and msg_pattern in response.json().get("msg"):
self.error_count += 1
setattr(self, "raise_on_http_errors", False)
if self.error_count > self.max_retry:
self.logger.warn(warning_msg + f"Skip fetching for slice {last_slice}.")
return False
else:
self.logger.warn(warning_msg + f"Retrying for slice {last_slice}, attempt {self.error_count}")
return True
else:
# All other cases
return True
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,14 @@ def config_fixture():
@pytest.fixture()
def mock_lists_resp(mocker):
mocker.patch("source_iterable.streams.Lists.read_records", return_value=iter([{"id": 1}, {"id": 2}]))


@pytest.fixture(name="lists_stream")
def lists_stream():
# local imports
from airbyte_cdk.sources.streams.http.auth import NoAuth
from source_iterable.streams import Lists

# return the instance of the stream so we could make global tests on it,
# to cover the different `should_retry` logic
return Lists(authenticator=NoAuth())
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,22 @@ def test_iterable_export_stream_backoff_time():
"status, json, expected",
[
(429, {}, True),
(500, {"msg": "...Please try again later...", "code": "Generic Error"}, True)
# for 500 - Generic error we should make 2 retry attempts
# and give up on third one!
(500, {"msg": "...Please try again later...1", "code": "Generic Error"}, True),
(500, {"msg": "...Please try again later...2", "code": "Generic Error"}, True),
# This one should return False
(500, {"msg": "...Please try again later...3", "code": "Generic Error"}, False)
],
ids=[
"Retry on 429",
"Retry on 500 - Generic (first)",
"Retry on 500 - Generic (second)",
"Retry on 500 - Generic (third)",
]
)
def test_should_retry(status, json, expected, requests_mock):
stream = Lists(authenticator=NoAuth())
def test_should_retry(status, json, expected, requests_mock, lists_stream):
stream = lists_stream
url = f"{stream.url_base}/{stream.path()}"
requests_mock.get(url, json=json, status_code=status)
test_response = requests.get(url)
Expand Down
2 changes: 1 addition & 1 deletion connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
| **Instatus** | <img alt="Instatus icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/instatus.svg" height="30" height="30"/> | Source | airbyte/source-instatus:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/instatus) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-instatus) | <small>`1901024c-0249-45d0-bcac-31a954652927`</small> |
| **Intercom** | <img alt="Intercom icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/intercom.svg" height="30" height="30"/> | Source | airbyte/source-intercom:0.1.31 | generally_available | [link](https://docs.airbyte.com/integrations/sources/intercom) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-intercom) | <small>`d8313939-3782-41b0-be29-b3ca20d8dd3a`</small> |
| **Intruder** | <img alt="Intruder icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/intruder.svg" height="30" height="30"/> | Source | airbyte/source-intruder:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/intruder) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-intruder) | <small>`3d15163b-11d8-412f-b808-795c9b2c3a3a`</small> |
| **Iterable** | <img alt="Iterable icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/iterable.svg" height="30" height="30"/> | Source | airbyte/source-iterable:0.1.25 | generally_available | [link](https://docs.airbyte.com/integrations/sources/iterable) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-iterable) | <small>`2e875208-0c0b-4ee4-9e92-1cb3156ea799`</small> |
| **Iterable** | <img alt="Iterable icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/iterable.svg" height="30" height="30"/> | Source | airbyte/source-iterable:0.1.26 | generally_available | [link](https://docs.airbyte.com/integrations/sources/iterable) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-iterable) | <small>`2e875208-0c0b-4ee4-9e92-1cb3156ea799`</small> |
| **Jenkins** | <img alt="Jenkins icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/jenkins.svg" height="30" height="30"/> | Source | farosai/airbyte-jenkins-source:0.1.23 | alpha | [link](https://docs.airbyte.com/integrations/sources/jenkins) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/airbyte-jenkins-source) | <small>`d6f73702-d7a0-4e95-9758-b0fb1af0bfba`</small> |
| **Jira** | <img alt="Jira icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/jira.svg" height="30" height="30"/> | Source | airbyte/source-jira:0.3.4 | beta | [link](https://docs.airbyte.com/integrations/sources/jira) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-jira) | <small>`68e63de2-bb83-4c7e-93fa-a8a9051e3993`</small> |
| **K6 Cloud** | <img alt="K6 Cloud icon" src="https://raw.githubusercontent.com/airbytehq/airbyte/master/airbyte-config/init/src/main/resources/icons/k6cloud.svg" height="30" height="30"/> | Source | airbyte/source-k6-cloud:0.1.0 | alpha | [link](https://docs.airbyte.com/integrations/sources/k6-cloud) | [code](https://github.com/airbytehq/airbyte/tree/master/airbyte-integrations/connectors/source-k6-cloud) | <small>`e300ece7-b073-43a3-852e-8aff36a57f13`</small> |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------|
| 0.1.26 | 2023-03-10 | [23938](https://github.com/airbytehq/airbyte/pull/23938) | Improve retry for `500 - Generic Error` |
| 0.1.25 | 2023-03-07 | [23821](https://github.com/airbytehq/airbyte/pull/23821) | Added retry for `500 - Generic Error`, increased max attempts number to `6` to handle `ChunkedEncodingError` |
| 0.1.24 | 2023-02-14 | [22979](https://github.com/airbytehq/airbyte/pull/22979) | Specified date formatting in specification |
| 0.1.23 | 2023-01-27 | [22011](https://github.com/airbytehq/airbyte/pull/22011) | Set `AvailabilityStrategy` for streams explicitly to `None` |
Expand Down

0 comments on commit 8f7642f

Please sign in to comment.