如何编写一个测试并发的 Tornado 单元测试

5

我希望确保我的Tornado处理程序不会阻塞,因此我想编写一些单元测试作为安全检查。

我打算编写一个异步休眠2秒的处理程序。在测试中,我想连续调用这个处理程序两次,以模拟“同时”请求。

如果我没有错,这两个请求应该是并发运行的,因此在小于4秒的时间内完成。问题是我不知道如何通过AsyncHTTPTestCase进行两个同时的请求来测试我的应用程序。

以下是我目前的代码:

class SyncSleepHandler(tornado.web.RequestHandler):
    def get(self):
        time.sleep(2)


class AsyncSleepHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        yield gen.sleep(2)


class SleepTest(AsyncHTTPTestCase):
    def get_app(self):
        return Application([(r'/sync', SyncSleepHandler),
                            (r'/async', AsyncSleepHandler)], debug=True)

    def test_async_sleep(self):
        start = time.time()
        resp1 = self.fetch(r'/async', method='GET')
        resp2 = self.fetch(r'/async', method='GET')
        diff = time.time() - start
        self.assertTrue(2 < diff < 4, msg="Difference is {:}".format(diff))
1个回答

5

AsyncHTTPTestCase.fetch 控制 IOLoop 并进行单次抓取,因此无法用于这种类型的测试,但您可以直接使用底层的 self.http_client,并使用 @tornado.testing.gen_test 控制 IOLoop

@gen_test
def test_async_sleep(self):
    start = time.time()
    resp1, resp2 = yield [
        self.http_client.fetch(self.get_url('/async')),
        self.http_client.fetch(self.get_url('/async')),
    ]
    diff = time.time() - start

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接