Skip to content

Commit

Permalink
Failed requests now cause exponential backoff for executor.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyBanks committed May 17, 2014
1 parent 3a5b7fa commit 9b47900
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 23 deletions.
24 changes: 19 additions & 5 deletions chatexchange/requestexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ class RequestExecutor(concurrent.futures.Executor):
see https://docs.python.org/3.4/library/concurrent.futures.html#concurrent.futures.Executor
@iattr max_attempts: The maximum number of times a request will be called/retried.
@iattr min_interval: The minimum amount of time that must elapse between request call invocations.
@iattr max_attempts: The maximum number of times a request will be called/retried.
@iattr min_interval: The minimum amount of time that must elapse between request call invocations.
@iattr consecutive_penalty_factor: A factor by which to multiply the min_interval
for each consecutive failure.
"""

def __init__(self, min_interval=3.0, max_attempts=5):
def __init__(self, min_interval=3.0, max_attempts=3, consecutive_penalty_factor=2.0):
self.min_interval = min_interval
self.max_attempts = max_attempts
self.consecutive_penalty_factor = consecutive_penalty_factor

self._request_queue = Queue.PriorityQueue()
self._new_requests = Queue.Queue()

Expand All @@ -37,6 +41,8 @@ def __init__(self, min_interval=3.0, max_attempts=5):
def _work(self):
logger.debug("Worker thread for %r starting", self)

self._consecutive_failures = 0

# keep working until we shut down submissions and there's nothing left to progress
# the order of the queue .empty() calls below is significant
while True:
Expand Down Expand Up @@ -69,8 +75,15 @@ def _work(self):
not_cancelled = request.running() or request.set_running_or_notify_cancel()
if not_cancelled:
logger.info("Worker attempting to run %r", request)
request._attempt()
self._time = time.time() + self.min_interval
successful = request._attempt()

if successful:
self._consecutive_failures = 0
self._time = time.time() + self.min_interval
else:
self._consecutive_failures += 1
self._time = time.time() + (
self.min_interval * self.consecutive_penalty_factor ** self._consecutive_failures)

# put the request back on the queue if it isn't done
if not request.done():
Expand Down Expand Up @@ -166,6 +179,7 @@ def _attempt(self):
try:
result = self._fn(*self._args, **self._kwargs)
self.set_result(result)
return True
except RequestAttemptFailed as ex:
self._time = time.time() + ex.min_interval
self._exceptions.append(ex)
Expand Down
42 changes: 24 additions & 18 deletions test/test_throttling.py → test/test_requestexecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,40 @@ def test_throttling():
but with no failures or retries.
"""

min_interval = 0.9
target_interval = 1.0
max_interval = 1.1
consecutive_penalty_factor = 1.5

logger.info("Creating RequestExecutor")
with requestexecutor.RequestExecutor(
min_interval=target_interval,
max_attempts=2
max_attempts=2,
consecutive_penalty_factor=consecutive_penalty_factor,
) as executor:
assert executor._thread.is_alive()

times = []

def successful_consecutively(value):
def simple_success(value):
times.append(time.time())
if len(times) > 1:
interval = times[-1] - times[-2]

assert interval >= target_interval, "interval %s < %s" % (interval, target_interval)
assert interval <= max_interval

return value

retry_times = []
def retry_in_5_first_time(value):
def retry_in_7_first_time(value):
times.append(time.time())
retry_times.append(time.time())
if len(retry_times) == 1:
raise requestexecutor.RequestAttemptFailed(5.0)
raise requestexecutor.RequestAttemptFailed(7.0)
return value

a = executor.submit(successful_consecutively, 'a')
b = executor.submit(successful_consecutively, 'b')
c = executor.submit(successful_consecutively, 'c')
d = executor.submit(successful_consecutively, 'd')
e = executor.submit(retry_in_5_first_time, 'e')
f = executor.submit(retry_in_5_first_time, 'f')
g = executor.submit(retry_in_5_first_time, 'g')
a = executor.submit(simple_success, 'a')
b = executor.submit(simple_success, 'b')
c = executor.submit(simple_success, 'c')
d = executor.submit(simple_success, 'd')
e = executor.submit(retry_in_7_first_time, 'e')
f = executor.submit(retry_in_7_first_time, 'f')
g = executor.submit(retry_in_7_first_time, 'g')

assert b.result() == 'b'
assert a.result() == 'a'
Expand All @@ -74,8 +71,17 @@ def retry_in_5_first_time(value):
assert d.result() == 'd'
assert f.result() == 'f'
assert g.result() == 'g'
assert 2.9 <= intervals[-1] <= 3.1 # the retried call

assert min_interval <= intervals[0] <= max_interval
assert min_interval <= intervals[1] <= max_interval
assert min_interval <= intervals[2] <= max_interval
assert min_interval <= intervals[3] <= max_interval # request 5 is the failure
assert (min_interval * consecutive_penalty_factor <= intervals[4]
<= max_interval * consecutive_penalty_factor)
assert min_interval <= intervals[5] <= max_interval
interval_from_failure_to_success = times[-1] - times[4]
logger.info('interval_from_failure_to_success = %r', interval_from_failure_to_success)
assert 6.9 <= interval_from_failure_to_success <= 7.1

if __name__ == '__main__':
import logging
Expand Down

0 comments on commit 9b47900

Please sign in to comment.