龙卷风阻塞异步请求

14
使用Tornado,我发出一个GET请求,它会花费很长时间,因为它会向另一个Web服务发出许多请求并处理数据,可能需要数分钟才能完全完成。我不希望它阻塞整个Web服务器响应其他请求,但现在它确实如此。
据我所知,Tornado是单线程的,并且按顺序执行每个请求,即使它异步处理它们(我还有点困惑)。长过程中有一些部分可以作为暂停点,以允许服务器处理其他请求(可能是解决方案之一?)。我在Heroku上使用单个worker运行它,因此不确定如何将其转换为生成新线程或多进程,因为我在Python中没有经验。
这就是我正在尝试做的:客户端发出get调用以启动过程,然后我每5秒循环一次另一个get调用以检查状态并使用新信息更新页面(长轮询也可以工作,但遇到相同的问题)。问题在于开始长时间进程会阻止所有新的get请求(或新的长轮询会话)直到完成。
有没有一种简单的方法来启动这个长get调用并且不让它在该过程中阻塞整个Web服务器?在代码中是否有任何东西可以放置以表示“暂停,继续处理挂起的请求,然后再继续”?
我需要在ProcessHandler上初始化一个get请求。然后,当ProcessHandler运行时,我仍然需要能够查询StatusHandler。
示例:
class StatusHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
       self.render("status.html")

class ProcessHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
       self.updateStatus("0")
       result1 = self.function1()
       self.updateStatus("1")
       result2 = self.function2(result1)
       self.updateStatus("2")
       result3 = self.function3(result2)
       self.updateStatus("3")
       self.finish()

你尝试过tornado.gen模块吗?http://www.tornadoweb.org/documentation/gen.html - dorsh
你是否记得在异步调用上加注释了吗?在你的GET方法上添加@asynchronous。 - andy boot
andyboot,是的,我的GET方法上有@asynchronous。 - ElJeffe
唐,我尝试将函数包装在get.task中,但它仍然阻止了其他get请求。我已更新我的帖子以提供更好的想法,我想做什么。 - ElJeffe
2个回答

18

以下是一个完整的 Tornado 应用程序示例,使用异步 HTTP 客户端和 gen.Task 模块使事情变得简单。

如果您在文档中阅读有关 gen.Task 的更多信息,您会发现您实际上可以同时调度多个请求。这是使用 Tornado 的核心思想,即一切都是非阻塞的,并仍然维护单个进程。

更新:我已经添加了线程处理程序以演示您如何将工作分派到第二个线程并在完成时接收 callback()

import os
import threading
import tornado.options
import tornado.ioloop
import tornado.httpserver
import tornado.httpclient
import tornado.web
from tornado import gen
from tornado.web import asynchronous

tornado.options.define('port', type=int, default=9000, help='server port number (default: 9000)')
tornado.options.define('debug', type=bool, default=False, help='run in debug mode with autoreload (default: False)')

class Worker(threading.Thread):
   def __init__(self, callback=None, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self.callback = callback

   def run(self):
        import time
        time.sleep(10)
        self.callback('DONE')

class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            (r"/", IndexHandler),
            (r"/thread", ThreadHandler),
        ]
        settings = dict(
            static_path = os.path.join(os.path.dirname(__file__), "static"),
            template_path = os.path.join(os.path.dirname(__file__), "templates"),
            debug = tornado.options.options.debug,
        )
        tornado.web.Application.__init__(self, handlers, **settings)

class IndexHandler(tornado.web.RequestHandler):
    client = tornado.httpclient.AsyncHTTPClient()

    @asynchronous
    @gen.engine
    def get(self):
        response = yield gen.Task(self.client.fetch, "http://google.com")

        self.finish("Google's homepage is %d bytes long" % len(response.body))

class ThreadHandler(tornado.web.RequestHandler):
    @asynchronous
    def get(self):
        Worker(self.worker_done).start()

    def worker_done(self, value):
        self.finish(value)

def main():
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main()

我将我的函数包装在gen.Task中,但它仍然做了同样的事情。我创建了一个get,其中有多个response = get.Tasks()。我不需要它们同时执行...实际上它们需要串行,但是当这个get请求正在处理时,任何其他get请求都会被阻塞。 - ElJeffe
在你的例子中,self.function1() 是一个纯粹的 Python 函数,它不会调用任何外部服务?最初的假设是它调用了另一个服务,而你被阻止了。 - koblas
1
你需要查找并替换urllib2.urlopen()函数为Tornado AsyncHTTPClient版本。因为urlopen()会阻塞直到接收到数据,而AsyncHTTPClient会将控制权返回到ioloop。 - koblas
@xingzhi.sg,我相信你是对的:http://www.tornadoweb.org/en/stable/web.html#thread-safety-notes - rectummelancolique
1
感谢你提供的解决方案。 但是,如果 /thread 正在加载,并且您为 /thread 打开了另一个选项卡,它仍然会被阻止。 为什么? - MK Yung
显示剩余5条评论

6

koblas的解决方案很棒。这里有一个替代方案,它利用了tornado.gen

import tornado.ioloop
import tornado.web
import tornado.gen
import tornado.concurrent
import time
from threading import Thread
from functools import wraps

def run_async(func):
  @wraps(func)
  def async_func(*args, **kwargs):
    func_hl = Thread(target = func, args = args, kwargs = kwargs)
    func_hl.start()
    return func_hl

  return async_func

@run_async
def sleeper(callback):
  i = 0
  while i <= 10:
    print i
    time.sleep(1)
    i += 1
  callback('DONE')


class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self):
        response = yield tornado.gen.Task(sleeper)
        self.write(response)
        self.finish()

class OtherHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('hello world')
        print 'in other'
        self.finish()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接