import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
一旦调用tornado.ioloop.IOLoop.instance().start()
,它将阻塞程序(或当前线程)。在查看IOLoop
对象的源代码时,文档中为stop
函数给出了以下示例:
To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this:
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
ioloop.start() will return after async_method has run its callback,
whether that callback was invoked before or after ioloop.start.
然而,我不知道如何将其集成到我的程序中。实际上,我有一个封装了 Web 服务器的类(拥有自己的 start
和 stop
函数),但是一旦我调用 start,程序(或测试)肯定会被阻塞。
我尝试在另一个进程中启动 Web 服务器(使用 multiprocessing
包)。这是包装 Web 服务器的类:
class Server:
def __init__(self, port=8888):
self.application = tornado.web.Application([ (r"/", Handler) ])
def server_thread(application, port):
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(port)
tornado.ioloop.IOLoop.instance().start()
self.process = Process(target=server_thread,
args=(self.application, port,))
def start(self):
self.process.start()
def stop(self):
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
然而,stop
方法似乎并不能完全停止 Web 服务器,因为在下一个测试中它仍然在运行,即使使用了此测试设置:
def setup_method(self, _function):
self.server = Server()
self.server.start()
time.sleep(0.5) # Wait for web server to start
def teardown_method(self, _function):
self.kstore.stop()
time.sleep(0.5)
如何在Python程序内部启动和停止Tornado Web服务器?
.close()
方法以停止服务器!请参见停止tornado应用程序。 - ti7