我希望确保我的Tornado处理程序不会阻塞,因此我想编写一些单元测试作为安全检查。
我打算编写一个异步休眠2秒的处理程序。在测试中,我想连续调用这个处理程序两次,以模拟“同时”请求。
如果我没有错,这两个请求应该是并发运行的,因此在小于4秒的时间内完成。问题是我不知道如何通过AsyncHTTPTestCase
进行两个同时的请求来测试我的应用程序。
以下是我目前的代码:
class SyncSleepHandler(tornado.web.RequestHandler):
def get(self):
time.sleep(2)
class AsyncSleepHandler(tornado.web.RequestHandler):
@gen.coroutine
def get(self):
yield gen.sleep(2)
class SleepTest(AsyncHTTPTestCase):
def get_app(self):
return Application([(r'/sync', SyncSleepHandler),
(r'/async', AsyncSleepHandler)], debug=True)
def test_async_sleep(self):
start = time.time()
resp1 = self.fetch(r'/async', method='GET')
resp2 = self.fetch(r'/async', method='GET')
diff = time.time() - start
self.assertTrue(2 < diff < 4, msg="Difference is {:}".format(diff))