Skip to content

Commit

Permalink
Initial implementation of RequestExecutor, and test.
Browse files Browse the repository at this point in the history
This is a throttling/retrying utility based on backported Python 3
Executors and Futures.

This implementation and test are a mess, but are at least partially
correct.

#69
  • Loading branch information
jeremyBanks committed May 17, 2014
1 parent c827f99 commit 3a5b7fa
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 1 deletion.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ branches:
- test-manish
- test-jeremy
- test-cpython
- test-pypy
env:
global:
- secure: "l3wVjDs94gpM1p+lmjBHPDJm65qMb4hCc9c6IA+0IAKCnLX0CNqzn5H4R3JIswOb9d8nMyF1iU/wwv7KBoZTnm8dSqoiv+puD82xc+auR3fF/pcxK2jZq6qgXTRlfgAst80qJbH2g5mNLLI30/mSHoHI/5FWLmjaoeifA4O1rkE="
Expand Down
192 changes: 192 additions & 0 deletions chatexchange/requestexecutor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import Queue
import functools
import logging
import threading

import concurrent.futures
import time


logger = logging.getLogger(__name__)


class RequestExecutor(concurrent.futures.Executor):
"""
An Executor which will run calls consecutively, rate-limited, and retries failures.
see https://docs.python.org/3.4/library/concurrent.futures.html#concurrent.futures.Executor
@iattr max_attempts: The maximum number of times a request will be called/retried.
@iattr min_interval: The minimum amount of time that must elapse between request call invocations.
"""

def __init__(self, min_interval=3.0, max_attempts=5):
self.min_interval = min_interval
self.max_attempts = max_attempts
self._request_queue = Queue.PriorityQueue()
self._new_requests = Queue.Queue()

self._shutdown = False

self._time = time.time()

self._thread = threading.Thread(target=self._work)
self._thread.daemon = False
self._thread.start()

def _work(self):
logger.debug("Worker thread for %r starting", self)

# keep working until we shut down submissions and there's nothing left to progress
# the order of the queue .empty() calls below is significant
while True:
#not (self._shutdown and self._new_requests.empty() and self._request_queue.empty())
global_interval_remaining = self._time - time.time()
request = None

if global_interval_remaining > 0:
new_requests_wait_timeout = global_interval_remaining
else:
try:
request = self._request_queue.get_nowait()
assert not request.done()
except Queue.Empty:
pass

if request is not None:
logger.debug("Worker examining %r", request)
# check if the highest-priority request is allowed to run yet
interval_remaining = request._time - time.time()

if interval_remaining > 0:
# if not, we'll wait for new requests until it is
new_requests_wait_timeout = interval_remaining
self._request_queue.put(request)
else:
# if so, run it.
new_requests_wait_timeout = None

not_cancelled = request.running() or request.set_running_or_notify_cancel()
if not_cancelled:
logger.info("Worker attempting to run %r", request)
request._attempt()
self._time = time.time() + self.min_interval

# put the request back on the queue if it isn't done
if not request.done():
self._request_queue.put(request)
else:
logger.info("Ignoring cancelled request")

self._request_queue.task_done()
else:
logger.debug("No actions in request queue. Waiting for activity on new request queue.")
new_requests_wait_timeout = 0.0

try:
new_request = None
if new_requests_wait_timeout is not None:
# wait for new requests until we know we have an existing request to run.
logger.debug("Worker waiting for new requests (timeout=%s).", new_requests_wait_timeout)
new_request = self._new_requests.get(timeout=new_requests_wait_timeout)
self._request_queue.put(new_request)
self._new_requests.task_done()

while True:
# put everything available into the request queue now, no waiting.
logger.debug("Worker loading all new requests, not waiting.")
new_request = self._new_requests.get_nowait()
self._request_queue.put(new_request)
self._new_requests.task_done()
except Queue.Empty:
if global_interval_remaining <= 0 and self._shutdown and request is None and new_request is None:
break

logger.debug("Continuing loop - shutdown=%r, request=%r, new_request=%r",
self._shutdown, request, new_request)

logger.info("worker thread done")


def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError("Executor already shutdown.")

request = RequestFuture(self, fn, args, kwargs)

self._new_requests.put(request)

return request

def shutdown(self, wait=True):
if not self._shutdown:
logger.info("Shutting down %r", self)

self._shutdown = True

if wait:
logger.info("waiting for %s new requests and %s queued requests",
self._new_requests.qsize(), self._request_queue.qsize())
self._new_requests.join()
self._request_queue.join()
logger.info("anticipated worker shutdown complete")


@functools.total_ordering
class RequestFuture(concurrent.futures.Future):
"""
@iattr time: The minimum time at which this request may next be attempted.
Lower times are processed first.
@iattr future: The result of the request.
@type future: L{concurrent.futures.Future}
"""

def __init__(self, executor, fn, args, kwargs):
self._executor = executor
self._fn = fn
self._args = args
self._kwargs = kwargs
self._time = time.time()
self._exceptions = []

super(RequestFuture, self).__init__()

def __eq__(self, other):
return self._time == other._time

def __lt__(self, other):
return self._time < other._time

def _attempt(self):
"""
Attempts to run the request, if it isn't already .done().
"""
assert not self.done()

try:
result = self._fn(*self._args, **self._kwargs)
self.set_result(result)
except RequestAttemptFailed as ex:
self._time = time.time() + ex.min_interval
self._exceptions.append(ex)
if len(self._exceptions) >= self._executor.max_attempts:
logger.info("Last attempt failed for request: %r", ex)
self.set_exception(ex)
except Exception as ex:
logger.exception("Unexpected exception in request attempt")
self._exceptions.append(ex)
self.set_exception(ex)


class RequestAttemptFailed(Exception):
"""
Raised to indicate that an attempt to run a request has failed, but may be retried.
@iattr min_interval: The minimum amount of time which must elapse before a retry.
@iattr exception: The exception that caused the failure, or None.
"""
def __init__(self, min_interval=0.0, exception=None):
self.min_interval = min_interval
self.exception = exception

super(RequestAttemptFailed, self).__init__(min_interval, exception)
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@

setuptools.setup(
name='ChatExchange',
version='0.0.1a2dev2',
version='0.0.1a2dev3',
url='https://github.com/Manishearth/ChatExchange',
packages=[
'chatexchange'
],
install_requires=[
'beautifulsoup4==4.3.2',
'futures==2.1.6',
'requests==2.2.1',
'websocket-client==0.13.0',
# only for dev:
Expand Down
83 changes: 83 additions & 0 deletions test/test_throttling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import time
import logging

from chatexchange import requestexecutor


logger = logging.getLogger(__name__)


def test_throttling():
"""
Tests the behaviour of the executor, taking advantage of throttling
but with no failures or retries.
"""

target_interval = 1.0
max_interval = 1.1

logger.info("Creating RequestExecutor")
with requestexecutor.RequestExecutor(
min_interval=target_interval,
max_attempts=2
) as executor:
assert executor._thread.is_alive()

times = []

def successful_consecutively(value):
times.append(time.time())
if len(times) > 1:
interval = times[-1] - times[-2]

assert interval >= target_interval, "interval %s < %s" % (interval, target_interval)
assert interval <= max_interval

return value

retry_times = []
def retry_in_5_first_time(value):
times.append(time.time())
retry_times.append(time.time())
if len(retry_times) == 1:
raise requestexecutor.RequestAttemptFailed(5.0)
return value

a = executor.submit(successful_consecutively, 'a')
b = executor.submit(successful_consecutively, 'b')
c = executor.submit(successful_consecutively, 'c')
d = executor.submit(successful_consecutively, 'd')
e = executor.submit(retry_in_5_first_time, 'e')
f = executor.submit(retry_in_5_first_time, 'f')
g = executor.submit(retry_in_5_first_time, 'g')

assert b.result() == 'b'
assert a.result() == 'a'

assert len(times) == 2

assert executor._thread.is_alive()

assert f.result() == 'f'
assert not e.done() # because it was retried
assert e.result() == 'e'
assert e.done()

logger.info("RequestExecutor has shut down")

intervals = [b - a for (a, b) in zip(times[0:], times[1:])]

logger.info('times: %r', times)
logger.info('intervals: %r', intervals)

assert c.result() == 'c'
assert d.result() == 'd'
assert f.result() == 'f'
assert g.result() == 'g'
assert 2.9 <= intervals[-1] <= 3.1 # the retried call


if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.DEBUG)
test_throttling()

1 comment on commit 3a5b7fa

@banksJeremy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp on this commit is wrong due to careless amending. I really just completed it.

Please sign in to comment.