如何在Python中逐个发送异步HTTP请求?

5
我们有一组作业,工作者逐一处理这些作业。每个作业需要我们格式化一些数据并发出一个HTTP POST请求,其中数据是请求有效负载。
如何使每个工作者以单线程、非阻塞的方式异步发出这些HTTP POST请求?我们不关心请求的响应——我们只希望请求尽快执行,然后工作者立即转移到下一个作业。
我们已经尝试使用gevent和grequests库(请参见为什么gevent.spawn不执行参数化函数直到调用Greenlet.join?)。我们的工作者代码看起来像这样:
def execute_task(worker, job):

    print "About to spawn request"
    greenlet = gevent.spawn(requests.post, url, params=params)

    print "Request spawned, about to call sleep"
    gevent.sleep()

    print "Greenlet status: ", greenlet.ready()

第一个打印语句被执行,但第二个和第三个打印语句从未被打印,URL也从未被访问。
我们如何使这些异步请求得以执行?

有一个标准库叫做[asyncore](http://docs.python.org/2 - lucasg
我得同意@georgesl的观点,asyncore是一个很好的迁移选择,因为它能够在后续开发中给你更好的应用灵活性。此外,http://stackoverflow.com/questions/15753901/python-asyncore-client-socket-can-not-determaine-connection-status/15754244#15754244 这里有一个很好的起点和示例,展示了如何使用它(请看我的问题的答案)。如果不行的话,你就必须使用多进程来实现,即使是Python的“子”库,如果可以并行发送请求,也很可能会为你进行线程处理,这就是多进程的特点。 - Torxed
你的 gevent 代码看起来不错(我进行了快速测试,发现它可以正常工作;我使用的是 gevent 1.0b3)。我猜这取决于调用 execute_task 的上下文环境。 - robertklep
我可以问一下您是否真的需要 gevent 吗?使用非标准库始终存在风险,因为它们可能与版本相关,在后续发布中需要更多开发或缺少功能,而标准库不会改变 :) 当我读到您关于版本等方面的评论时,只是想到了这个。 - Torxed
4个回答

1
你可能想使用join方法而不是sleep,然后检查状态。如果你想逐个执行,那么这将解决问题。稍微修改一下你的代码测试一下,似乎可以正常工作。
import gevent
import requests

def execute_task(worker, job):

    print "About to spawn request"
    greenlet = gevent.spawn(requests.get, 'http://example.com', params={})

    print "Request spawned, about to call sleep"
    gevent.sleep()

    print "Greenlet status: ", greenlet.ready()
    print greenlet.get()

execute_task(None, None)

给出结果:
About to spawn request
Request spawned, about to call sleep
Greenlet status:  True
<Response [200]>

这个 Python 进程中是否还有其他可能阻止 Gevent 运行该 greenlet 的操作?


1

1) 创建一个 Queue.Queue 对象

2) 创建尽可能多的“工作”线程,这些线程循环并从 Queue.Queue 中读取数据

3) 将任务放入 Queue.Queue 中

工作线程将按照它们放置在队列中的顺序依次读取队列中的内容

以下是一个示例,它从文件中读取行并将其放入 Queue.Queue 中

import sys
import urllib2
import urllib
from Queue import Queue
import threading
import re

THEEND = "TERMINATION-NOW-THE-END"


#read from file into Queue.Queue asynchronously
class QueueFile(threading.Thread):
    def run(self):
        if not(isinstance(self.myq, Queue)):
            print "Queue not set to a Queue"
            sys.exit(1)
        h = open(self.f, 'r')
        for l in h:
            self.myq.put(l.strip())  # this will block if the queue is full
        self.myq.put(THEEND)

    def set_queue(self, q):
        self.myq = q

    def set_file(self, f):
        self.f = f

一个工作线程的想法(仅为示例)
class myWorker(threading.Thread):
    def run(self):
        while(running):           
            try:
                data = self.q.get()  # read from fifo

                req = urllib2.Request("http://192.168.1.10/url/path")
                req.add_data(urllib.urlencode(data))
                h1 = urllib2.urlopen(req, timeout=10)
                res = h1.read()
                assert(len(res) > 80)

            except urllib2.HTTPError, e:
                print e

            except urllib2.URLError, e:
                print "done %d reqs " % n
                print e
                sys.exit()

创建基于 threading.Thread 的对象后,调用实例上的 "start" 方法来启动对象。

1
你需要在不同的线程中运行它,或使用内置的asyncore库。 大多数库将在您甚至不知道的情况下利用线程,或者依赖于Python的标准部分asyncore。
以下是Threading和asyncore的组合:
#!/usr/bin/python
# -*- coding: iso-8859-15 -*-
import asyncore, socket
from threading import *
from time import sleep
from os import _exit
from logger import *  # <- Non-standard library containing a log function
from config import *  # <- Non-standard library containing settings such as "server"

class logDispatcher(Thread, asyncore.dispatcher):
    def __init__(self, config=None):
        self.inbuffer = ''
        self.buffer = ''
        self.lockedbuffer = False
        self.is_writable = False

        self.is_connected = False

        self.exit = False
        self.initated = False

        asyncore.dispatcher.__init__(self)
        Thread.__init__(self)

        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            self.connect((server, server_port))
        except:
            log('Could not connect to ' + server, 'LOG_SOCK')
            return None

        self.start()

    def handle_connect_event(self):
        self.is_connected = True

    def handle_connect(self):
        self.is_connected = True
        log('Connected to ' + str(server), 'LOG_SOCK')

    def handle_close(self):
        self.is_connected = False
        self.close()

    def handle_read(self):
        data = self.recv(8192)
        while self.lockedbuffer:
            sleep(0.01)

        self.inbuffer += data


    def handle_write(self):
        while self.is_writable:
            sent = self.send(self.buffer)
            sleep(1)

            self.buffer = self.buffer[sent:]
            if len(self.buffer) <= 0:
                self.is_writable = False
            sleep(0.01)

    def _send(self, what):
        self.buffer += what + '\r\n'
        self.is_writable = True

    def run(self):
        self._send('GET / HTTP/1.1\r\n')

while 1:
    logDispatcher() # <- Initate one for each request.
    asyncore.loop(0.1)
    log('All threads are done, next loop in 10', 'CORE')
    sleep(10)

或者你可以创建一个执行任务后就结束的线程。

from threading import *
class worker(Thread):
    def __init__(self, host, postdata)
        Thread.__init__(self)
        self.host = host
        self.postdata = postdata
        self.start()
    def run(self):
        sock.send(self.postdata) #Pseudo, create the socket!

for data in postDataObjects:
    worker('example.com', data)

如果您需要限制线程数量(如果您发送超过5k个帖子,系统可能会变得繁重),只需执行while len(enumerate()) > 1000: sleep(0.1),让looper对象等待一些线程关闭即可。

0

将您的URL和参数放入列表中,然后逐个弹出一对到任务池中(这里的任务池要么只有一个任务,要么为空),创建线程,从任务池中读取任务,当一个线程获取任务并发送请求时,从列表中弹出另一个任务(即实际上这是一个队列列表)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接