Skip to content
This repository has been archived by the owner on Dec 16, 2017. It is now read-only.

Commit

Permalink
No, use local grequests, sigh.
Browse files Browse the repository at this point in the history
This reverts commit 1bf5974.
  • Loading branch information
Kyle Maxwell committed Aug 6, 2014
1 parent 2955e07 commit abe00de
Showing 1 changed file with 149 additions and 0 deletions.
149 changes: 149 additions & 0 deletions grequests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# -*- coding: utf-8 -*-

"""
grequests
~~~~~~~~~
This module contains an asynchronous replica of ``requests.api``, powered
by gevent. All API methods return a ``Request`` instance (as opposed to
``Response``). A list of requests can be sent with ``map()``.
"""
from functools import partial

try:
import gevent
from gevent import monkey as curious_george
from gevent.pool import Pool
except ImportError:
raise RuntimeError('Gevent is required for grequests.')

# Monkey-patch.
curious_george.patch_all(thread=False, select=False)

from requests import Session


__all__ = (
'map', 'imap',
'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request'
)


class AsyncRequest(object):
""" Asynchronous request.
Accept same parameters as ``Session.request`` and some additional:
:param session: Session which will do request
:param callback: Callback called on response.
Same as passing ``hooks={'response': callback}``
"""
def __init__(self, method, url, **kwargs):
#: Request method
self.method = method
#: URL to request
self.url = url
#: Associated ``Session``
self.session = kwargs.pop('session', None)
if self.session is None:
self.session = Session()

callback = kwargs.pop('callback', None)
if callback:
kwargs['hooks'] = {'response': callback}

#: The rest arguments for ``Session.request``
self.kwargs = kwargs
#: Resulting ``Response``
self.response = None

def send(self, **kwargs):
"""
Prepares request based on parameter passed to constructor and optional ``kwargs```.
Then sends request and saves response to :attr:`response`
:returns: ``Response``
"""
merged_kwargs = {}
merged_kwargs.update(self.kwargs)
merged_kwargs.update(kwargs)
try:
self.response = self.session.request(self.method,
self.url, **merged_kwargs)
except Exception as e:
self.exception = e
return self


def send(r, pool=None, stream=False):
"""Sends the request object using the specified pool. If a pool isn't
specified this method blocks. Pools are useful because you can specify size
and can hence limit concurrency."""
if pool != None:
return pool.spawn(r.send, stream=stream)

return gevent.spawn(r.send, stream=stream)


# Shortcuts for creating AsyncRequest with appropriate HTTP method
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
head = partial(AsyncRequest, 'HEAD')
post = partial(AsyncRequest, 'POST')
put = partial(AsyncRequest, 'PUT')
patch = partial(AsyncRequest, 'PATCH')
delete = partial(AsyncRequest, 'DELETE')

# synonym
def request(method, url, **kwargs):
return AsyncRequest(method, url, **kwargs)


def map(requests, stream=False, size=None, exception_handler=None):
"""Concurrently converts a list of Requests to Responses.
:param requests: a collection of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
:param exception_handler: Callback function, called when exception occured. Params: Request, Exception
"""

requests = list(requests)

pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs)

ret = []

for request in requests:
if request.response:
ret.append(request.response)
elif exception_handler:
exception_handler(request, request.exception)

return ret


def imap(requests, stream=False, size=2, exception_handler=None):
"""Concurrently converts a generator object of Requests to
a generator of Responses.
:param requests: a generator of Request objects.
:param stream: If True, the content will not be downloaded immediately.
:param size: Specifies the number of requests to make at a time. default is 2
:param exception_handler: Callback function, called when exception occured. Params: Request, Exception
"""

pool = Pool(size)

def send(r):
return r.send(stream=stream)

for request in pool.imap_unordered(send, requests):
if request.response:
yield request.response
elif exception_handler:
exception_handler(request, request.exception)

pool.join()

0 comments on commit abe00de

Please sign in to comment.