扭曲/执行异步http请求

4

我有一个监听传入数据的反应器。我有第二个反应器,在特定时间间隔内执行http请求,并将结果发送到第一个反应器。两者都运行良好。

现在我想将它们合并到一个反应器中运行,但我不知道如何实现。类似于在第一个“主”反应器内异步地每60秒执行一次http请求。

目前我的代码是:

# main reactor listening for incoming data forever
...
reactor.listenTCP(8123, TCPEventReceiverFactory())

HTTP反应器使用twisted.internet.defer.DeferredSemaphore()执行多个HTTP检查:

# create semaphore to manage the deferreds
semaphore = twisted.internet.defer.DeferredSemaphore(2)

# create a list with all urls to check
dl = list()
# append deferreds to list
for url in self._urls:
    # returns deferred
    dl.append(semaphore.run(self._getPage, url))

# get a DefferedList
dl = twisted.internet.defer.DeferredList(dl)
# add some callbacks for error handling
dl.addCallbacks(lambda x: reactor.stop(), self._handleError)

# start the reactor    
reactor.run()

我该如何将定时的http检查添加到“main”反应器中,以便以异步方式执行它们? DeferredSemaphore 的工作原理是什么?
有人能帮我解决这个问题吗?
[这是一种处理http检查结果的轻量级监控系统。 我对Twisted和异步编程很陌生。 我正在运行Python 2.7的Xubuntu 12.04操作系统。]

延迟信号量是用来做什么的? - SingleNegationElimination
它用于限制同时执行的请求。 - user937284
1个回答

6

你不需要多个反应器。只需使用同一个反应器执行所有不同的操作。

如果你在调用reactor.stop(),很可能正在做一些错误的事情,因此让我们把它们全部绑定到单个函数中(我们可以将其用作回调); 由于它正在执行异步工作,因此它还应该返回一个deferred。我们将使用你已经在使用的DeferredList

def thing_that_does_http():
    # create semaphore to manage the deferreds
    semaphore = twisted.internet.defer.DeferredSemaphore(2)

    # create a list with all urls to check
    dl = DeferredList()
    # append deferreds to list
    for url in self._urls:
        # returns deferred
        dl.append(semaphore.run(self._getPage, url))

    # get a DefferedList
    dl = twisted.internet.defer.DeferredList(dl)
    # add some callbacks for error handling
    dl.addErrback(self._handleError)
    return dl

在IT技术中,“在特定时间间隔内执行 x”的自然方式是使用循环调用。通过使用此回调函数,我们无需做太多工作。

reactor.listenTCP(8123, TCPEventReceiverFactory())
loop_http = twisted.intertnet.task.LoopingCall(thing_that_does_http)
# run once per minute, starting now.
loop_http.start(60)

在it技术中,LoopingCallgetPage将使用twisted.internet.reactor来完成它们自己的任务,如果你使用的是不同的反应器,例如进行单元测试时,你需要覆盖默认设置。

对于LoopingCall,很简单,在构造之后(但在调用其start()方法之前),只需设置其clock属性即可:

from twisted.internet.task import Clock
fake_reactor = Clock()
loop_http.clock = fake_reactor
fake_reactor.advance(120)  # move time forward two minutes...

不幸的是,getPage() 的情况不太好。你不能使用任何其他反应器与该接口一起使用;你需要使用更新、更亮眼的t.w.c.Agent。在许多方面,Agent 是优越的,但当你只想要原始响应体作为字符串时,它并不那么方便。

除了要求将显式反应器传递给其构造函数之外,它更多地关注于对请求/响应周期的细粒度控制,而不是getPage提供的方便。因此,它主要是通过ProducerProtocol实现的。在前者的情况下,我们可以传递一个方便的帮助程序FileBodyProducer,以最小的麻烦发送请求正文;在后者的情况下,我们需要一个简单的协议来缓冲所有数据块,直到我们获取了所有数据。

这里有一段代码可以替换getPage,大致具有相同的接口,但将Agent实例作为第一个参数。

from cStringIO import StringIO
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol
from twisted.web.client import ResponseDone
from twisted.web.client import FileBodyProducer


class GetPageProtocol(Protocol):
    def __init__(self):
        self.deferred = Deferred()
        self.data = []

    def dataReceived(self, data):
        self.data.append(data)

    def connectionLost(self, reason):
        reason.trap(ResponseDone)
        data = ''.join(self.data)
        del self.data
        self.deferred.callback(data)


def agentGetPage(agent, url,
                 method="GET",
                 headers=None,
                 postdata=None):
    if postdata is not None:
        bodyProducer = FileBodyProducer(StringIO(postdata))
    else:
        bodyProducer = None

    def _getPageResponded(response):
        if response.length != 0:
            proto = GetPageProtocol()
            response.deliverBody(proto)
            return proto.deferred
        else:
            return None

    d = agent.request(method, url, headers, bodyProducer)
    d.addCallback(_getPageResponded)
    return d

在单元测试中,它看起来会像这样:

from twisted.test.proto_helpers import MemoryReactor
from twisted.web.client import Agent
fake_reactor = MemoryReactor()
agent = Agent(fake_reactor)
d = agentGetPage(agent, "http://example.com")

assert fake_reactor.tcpClients  # or some such, exercise the code by manipulating the reactor

编辑:起初我想简单浏览一下,以便让ectomorph少一些困惑;但是现在早期灵活处理反应器并避免不必要的痛苦也是一个非常好的主意。


1
谢谢!太棒了,这正是我在寻找的。twisted.internet.reactor 是我正在使用的反应器。 - user937284
2
回答不错,但并非完全正确 :). LoopingCall 实际上会使用 self.clock,它只是默认情况下初始化为 twisted.internet.reactor。更改它的能力很重要,特别是用于测试。(可悲的是,getPage 实际上已经硬编码到了它,这也是我们现在推荐使用 twisted.web.client.Agent 的原因之一。) - Glyph
2
@Glyph:更新了:我认为这样涵盖了使用反应器的更少糟糕的部分。 - SingleNegationElimination
谢谢大家的完成! - user937284
@TokenMacGuy 感谢您为后人整理答案!如果您感到更加慷慨,实际上有一张票可以将像 agentGetPage 这样的函数添加到 Twisted 本身中,以弥合这种便利差距。您有兴趣参与吗?https://twistedmatrix.com/trac/ticket/5405 - Glyph

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接