Skip to content

Commit

Permalink
feature: Add retry logic in LimitedConcurrencyClient in case of `Bu…
Browse files Browse the repository at this point in the history
…syError`

TASK: IL-431
  • Loading branch information
FlorianSchepersAA committed Apr 25, 2024
1 parent 1c4f153 commit 35ef907
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- feature: `Llama3InstructModel` to support llama-3 models in Aleph Alpha API
- feature: `ExpandChunks`-task caches chunked documents by ID
- feature: `Runner.run_dataset` now has a configurable number of workers via `max_workers` and defaults to the previous value, which is 10.
- feature: In case a `BusyError` is raised during a `complete` the `LimitedConcurrencyClient` will retry until `max_retry_time` is reached.

### Fixes
- fix: `HuggingFaceRepository` no longer is a dataset repository. This also means that `HuggingFaceAggregationRepository` no longer is a dataset repository.
Expand Down
30 changes: 28 additions & 2 deletions src/intelligence_layer/connectors/limited_concurrency_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from functools import lru_cache
from os import getenv
from threading import Semaphore
Expand Down Expand Up @@ -102,14 +103,19 @@ class LimitedConcurrencyClient:
client: The wrapped `Client`.
max_concurrency: the maximal number of requests that may run concurrently
against the API.
max_retry_time: the maximal time in seconds a complete is retried in case a `BusyError` is raised.
"""

def __init__(
self, client: AlephAlphaClientProtocol, max_concurrency: int = 20
self,
client: AlephAlphaClientProtocol,
max_concurrency: int = 20,
max_retry_time: int = 600,
) -> None:
self._client = client
self._concurrency_limit_semaphore = Semaphore(max_concurrency)
self._max_retry_time = max_retry_time

@classmethod
@lru_cache(maxsize=1)
Expand Down Expand Up @@ -143,7 +149,27 @@ def complete(
model: str,
) -> CompletionResponse:
with self._concurrency_limit_semaphore:
return self._client.complete(request, model)
retries = 0
start_time = time.time()
latest_exception = None
while time.time() - start_time < self._max_retry_time:
try:
return self._client.complete(request, model)
except Exception as e:
latest_exception = e
if e.args[0] == 503:
time.sleep(
min(
2**retries,
self._max_retry_time - (time.time() - start_time),
)
)
retries += 1
continue
else:
raise e
assert latest_exception is not None
raise latest_exception

def get_version(self) -> str:
with self._concurrency_limit_semaphore:
Expand Down
51 changes: 51 additions & 0 deletions tests/connectors/test_limited_concurrency_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from time import sleep
from typing import cast

import pytest
from aleph_alpha_client import CompletionRequest, CompletionResponse, Prompt
from pytest import fixture

Expand Down Expand Up @@ -37,6 +38,25 @@ def complete(self, request: CompletionRequest, model: str) -> CompletionResponse
)


class BusyClient:
def __init__(
self,
return_value: CompletionResponse | Exception,
) -> None:
self.number_of_retries: int = 0
self.return_value = return_value

def complete(self, request: CompletionRequest, model: str) -> CompletionResponse:
self.number_of_retries += 1
if self.number_of_retries < 2:
raise Exception(503)
else:
if isinstance(self.return_value, Exception):
raise self.return_value
else:
return self.return_value


TEST_MAX_CONCURRENCY = 3


Expand Down Expand Up @@ -66,3 +86,34 @@ def test_methods_concurrency_is_limited(
["model"] * TEST_MAX_CONCURRENCY * 10,
)
assert concurrency_counting_client.max_concurrency_counter == TEST_MAX_CONCURRENCY


def test_limited_concurrency_client_retries() -> None:
expected_completion = CompletionResponse(
model_version="model-version",
completions=[],
optimized_prompt=None,
num_tokens_generated=0,
num_tokens_prompt_total=0,
)
busy_client = BusyClient(return_value=expected_completion)
limited_concurrency_client = LimitedConcurrencyClient(
cast(AlephAlphaClientProtocol, busy_client)
)
completion = limited_concurrency_client.complete(
CompletionRequest(prompt=Prompt("")), "model"
)
assert completion == expected_completion


def test_limited_concurrency_client_throws_exception() -> None:
expected_exception = Exception(404)
busy_client = BusyClient(return_value=expected_exception)
limited_concurrency_client = LimitedConcurrencyClient(
cast(AlephAlphaClientProtocol, busy_client)
)
with pytest.raises(Exception) as e:
limited_concurrency_client.complete(
CompletionRequest(prompt=Prompt("")), "model"
)
assert e.value == expected_exception

0 comments on commit 35ef907

Please sign in to comment.