如何将requests变成一个异步HTTP库


#Python


2014-10-04

requests库是python一个优秀的HTTP库,使用它可以非常简单地执行HTTP的各种操作,例如GET、POST等。不过,这个库所执行的网络请求都是同步了,即cpu发出请求指令后,IO执行发送和等待等操作,在这段IO执行的时间里,cpu什么也不做,这样cpu的计算能力就被浪费了。所以,可以尝试把网络请求修改为异步的,也就是在IO发挥作用的这段时间,CPU去做这个程序里的其他事情,等IO收到响应的数据,CPU回来处理。下面介绍一下如何将requests变成一个异步HTTP库。

requests

安装:

sudo pip install requests

一个获取页面的示例(test01.py):

# -*- coding: utf-8 -*- 

import sys
reload(sys)
sys.setdefaultencoding('utf-8') 

import requests
r = requests.get('http://letiantian.xyz/')
print r.text

如果网络没问题,会获取http://letiantian.xyz/页面的html代码。

gevent

gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

话是这样说,gevent的效果是异步的。

安装:

sudo pip install gevent

不使用gevent,test02.py代码为:

# -*- coding: utf-8 -*- 

import sys
reload(sys)
sys.setdefaultencoding('utf-8') 

import urllib2

urls = ["http://letiantian.xyz/"] *10

def get_content(url):
	data = urllib2.urlopen(url).read()
	return data

for url in urls:
	get_content(url)

使用gevent,test03.py代码为:

# -*- coding: utf-8 -*- 

import sys
reload(sys)
sys.setdefaultencoding('utf-8') 

import gevent
from gevent import monkey
monkey.patch_socket()

import urllib2

urls = ["http://letiantian.xyz/"] *10

def get_content(url):
	data = urllib2.urlopen(url).read()
	return data


jobs = [gevent.spawn(get_content, url) for url in urls]
gevent.joinall(jobs)

# print jobs[0].value

比较两者的运行时间:

zsh >> time  python test02.py 
python test02.py  0.05s user 0.02s system 0% cpu 7.362 total
zsh >> time  python test03.py
python test03.py  0.06s user 0.01s system 3% cpu 2.100 total

可见,使用gevent时,cpu利用率较高,且程序速度也较快(3倍速度,也可能更快)。

grequests库:使用gevent封装requests

安装:

sudo pip install grequests

使用grequests库,test04.py代码如下:

# -*- coding: utf-8 -*- 

import sys
reload(sys)
sys.setdefaultencoding('utf-8') 

import grequests

urls = ["http://letiantian.xyz/"] *10
reqs = [grequests.get(url) for url in urls ]
response = grequests.map(reqs)

response0 = response[0]

# print response0.text

运行时间:

zsh >> time  python test04.py
python test04.py  0.13s user 0.06s system 10% cpu 1.698 total

grequests库存放在https://github.com/kennethreitz/grequests,代码很少,很容易看懂。

下面结合grequests的源码分析一下test04.py。 grequests.get(url)其实就是grequests.AsyncRequest('GET', url),grequests中是使用了functools.partial对此进行了转换。 类AsyncRequest__init__函数做了一些初始化函数,self.url就是初始化该对象时给出的url,self.session是一个requests.Session()对象。 类AsyncRequest有一个send()方法,该方法就是调用self.session的request方法,将请求结果放入self.response中。 grequests下也有一个send方法,其内容是:

def send(r, pool=None, stream=False):
    if pool != None:
        return pool.spawn(r.send, stream=stream)
    return gevent.spawn(r.send, stream=stream)

pool是gevent.pool.Pool的一个实例,用来管理一组greenlet。无论是pool.spawn还是gevent.spawn都是负责执行异步请求。

上面的说通了,那么grequests下的map函数也好理解了。

def map(requests, stream=False, size=None, exception_handler=None):
    requests = list(requests)
    pool = Pool(size) if size else None
    jobs = [send(r, pool, stream=stream) for r in requests]
    gevent.joinall(jobs)
    ret = []
    for request in requests:
        if request.response:
            ret.append(request.response)
        elif exception_handler:
            exception_handler(request, request.exception)
    return ret

后面贴了grequests的源码。

requests-futures库

requests-futures库也是将requests库封装成了一个异步库,不过使用的是python3中的新特性(在python2中也能使用,但需要额外安装futures库),实现得也很简单,此处就不介绍了。

参考

http://www.gevent.org/ https://github.com/kennethreitz/grequests http://www.gevent.org/gevent.monkey.html#gevent.monkey.patch_all https://github.com/ross/requests-futures https://pypi.python.org/pypi/futures http://www.2cto.com/kf/201308/232824.html http://www.gevent.org/gevent.pool.html

grequests的源码

# -*- coding: utf-8 -*-

"""
grequests
~~~~~~~~~

This module contains an asynchronous replica of ``requests.api``, powered
by gevent. All API methods return a ``Request`` instance (as opposed to
``Response``). A list of requests can be sent with ``map()``.
"""
from functools import partial

try:
    import gevent
    from gevent import monkey as curious_george
    from gevent.pool import Pool
except ImportError:
    raise RuntimeError('Gevent is required for grequests.')

# Monkey-patch.
curious_george.patch_all(thread=False, select=False)

from requests import Session


__all__ = (
    'map', 'imap',
    'get', 'options', 'head', 'post', 'put', 'patch', 'delete', 'request'
)


class AsyncRequest(object):
    """ Asynchronous request.

    Accept same parameters as ``Session.request`` and some additional:

    :param session: Session which will do request
    :param callback: Callback called on response.
                     Same as passing ``hooks={'response': callback}``
    """
    def __init__(self, method, url, **kwargs):
        #: Request method
        self.method = method
        #: URL to request
        self.url = url
        #: Associated ``Session``
        self.session = kwargs.pop('session', None)
        if self.session is None:
            self.session = Session()

        callback = kwargs.pop('callback', None)
        if callback:
            kwargs['hooks'] = {'response': callback}

        #: The rest arguments for ``Session.request``
        self.kwargs = kwargs
        #: Resulting ``Response``
        self.response = None

    def send(self, **kwargs):
        """
        Prepares request based on parameter passed to constructor and optional ``kwargs```.
        Then sends request and saves response to :attr:`response`

        :returns: ``Response``
        """
        merged_kwargs = {}
        merged_kwargs.update(self.kwargs)
        merged_kwargs.update(kwargs)
        try:
            self.response =  self.session.request(self.method,
                                                self.url, **merged_kwargs)
        except Exception as e:
            self.exception = e
        return self


def send(r, pool=None, stream=False):
    """Sends the request object using the specified pool. If a pool isn't
    specified this method blocks. Pools are useful because you can specify size
    and can hence limit concurrency."""
    if pool != None:
        return pool.spawn(r.send, stream=stream)

    return gevent.spawn(r.send, stream=stream)


# Shortcuts for creating AsyncRequest with appropriate HTTP method
get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')
head = partial(AsyncRequest, 'HEAD')
post = partial(AsyncRequest, 'POST')
put = partial(AsyncRequest, 'PUT')
patch = partial(AsyncRequest, 'PATCH')
delete = partial(AsyncRequest, 'DELETE')

# synonym
def request(method, url, **kwargs):
    return AsyncRequest(method, url, **kwargs)


def map(requests, stream=False, size=None, exception_handler=None):
    """Concurrently converts a list of Requests to Responses.

    :param requests: a collection of Request objects.
    :param stream: If True, the content will not be downloaded immediately.
    :param size: Specifies the number of requests to make at a time. If None, no throttling occurs.
    :param exception_handler: Callback function, called when exception occured. Params: Request, Exception
    """

    requests = list(requests)

    pool = Pool(size) if size else None
    jobs = [send(r, pool, stream=stream) for r in requests]
    gevent.joinall(jobs)

    ret = []

    for request in requests:
        if request.response:
            ret.append(request.response)
        elif exception_handler:
            exception_handler(request, request.exception)

    return ret


def imap(requests, stream=False, size=2, exception_handler=None):
    """Concurrently converts a generator object of Requests to
    a generator of Responses.

    :param requests: a generator of Request objects.
    :param stream: If True, the content will not be downloaded immediately.
    :param size: Specifies the number of requests to make at a time. default is 2
    :param exception_handler: Callback function, called when exception occured. Params: Request, Exception
    """

    pool = Pool(size)

    def send(r):
        return r.send(stream=stream)

    for request in pool.imap_unordered(send, requests):
        if request.response:
            yield request.response
        elif exception_handler:
            exception_handler(request, request.exception)

    pool.join()


( 本文完 )