异步编程中的同步睡眠问题:将同步睡眠转化为asyncio协程形式

11

我有一个如下的协程:

async def download():
    downloader = DataManager()
    downloader.download()

DataManager.download() 方法的样子如下:

def download(self):
    start_multiple_docker_containers()
    while True:
        check_containers_statuses()
        sleep(N)  # synchronous sleep from time module

这是一种好的实践方法吗?如果不是,我应该如何在 download() 中使用 asyncio.sleep 呢?

或者说,这种代码结构从概念上讲是错误的吗?


1
你能将 DataManager.download 改为协程吗? - dirn
当然可以。但是,如果我想将代码分解为不同的方法,则必须将它们也作为协程。如果下载任务过多,我担心这会对性能产生不利影响。 - A.Smith
3个回答

7

这是我的解决方案:

import asyncio
import time


# Mocks of domain-specific functions
# ----------------------------------

def get_container_status(container_id, initial_time):
    """This mocks container status to change to 'exited' in 10 seconds"""
    if time.time() - initial_time < 10:
        print("%s: container %s still running" % (time.time(), container_id))
        return 'running'
    else:
        print("%s: container %s exited" % (time.time(), container_id))
        return 'exited'

def is_download_complete(container_id, initial_time):
    """This mocks download finished in 20 seconds after program's start"""
    if time.time() - initial_time < 20:
        print("%s: download from %s in progress" % (time.time(), container_id))
        return False
    else:
        print("%s: download from %s done" % (time.time(), container_id))
        return True

def get_downloaded_data(container_id):
    return "foo"


# Coroutines
# ----------

async def container_exited(container_id, initial_time):
    while True:
        await asyncio.sleep(1) # == setTimeout(1000), != sleep(1000)
        if get_container_status(container_id, initial_time) == 'exited':
            return container_id

async def download_data_by_container_id(container_id, initial_time):
    container_id = await container_exited(container_id, initial_time)
    while True:
        await asyncio.sleep(1)
        if is_download_complete(container_id, initial_time):
            return get_downloaded_data(container_id)


# Main loop
# ---------

if __name__ == "__main__":

    initial_time = time.time()

    loop = asyncio.get_event_loop()

    tasks = [
        asyncio.ensure_future(download_data_by_container_id("A", initial_time)),
        asyncio.ensure_future(download_data_by_container_id("B", initial_time))
    ]

    loop.run_until_complete(asyncio.wait(tasks))

    loop.close()

结果为:

1487334722.321165: container A still running
1487334722.321412: container B still running
1487334723.325897: container A still running
1487334723.3259578: container B still running
1487334724.3285959: container A still running
1487334724.328662: container B still running
1487334725.3312798: container A still running
1487334725.331337: container B still running
1487334726.3340318: container A still running
1487334726.33409: container B still running
1487334727.336779: container A still running
1487334727.336842: container B still running
1487334728.339425: container A still running
1487334728.339506: container B still running
1487334729.34211: container A still running
1487334729.342168: container B still running
1487334730.3448708: container A still running
1487334730.34493: container B still running
1487334731.34754: container A exited
1487334731.347598: container B exited
1487334732.350253: download from A in progress
1487334732.3503108: download from B in progress
1487334733.354369: download from A in progress
1487334733.354424: download from B in progress
1487334734.354686: download from A in progress
1487334734.3548028: download from B in progress
1487334735.358371: download from A in progress
1487334735.358461: download from B in progress
1487334736.3610592: download from A in progress
1487334736.361115: download from B in progress
1487334737.363115: download from A in progress
1487334737.363211: download from B in progress
1487334738.3664992: download from A in progress
1487334738.36656: download from B in progress
1487334739.369131: download from A in progress
1487334739.36919: download from B in progress
1487334740.371079: download from A in progress
1487334740.37119: download from B in progress
1487334741.374521: download from A done
1487334741.3745651: download from B done

关于 sleep() 函数,不应该使用它。它会阻塞整个 Python 解释器 1 秒钟,这不是你想要的结果。
记住,你没有并行性(线程等),只有并发性。也就是说,你有一个 Python 解释器,只有一个执行线程,其中你的主循环和所有的协程运行,并且会抢占彼此。你希望你的解释器在 asyncio 创建的主循环中花费其工作时间的 99.999%,轮询套接字并等待超时。
所有的协程都应该尽快返回,绝对不应包含阻塞的 sleep - 如果调用它,会阻塞整个解释器,并防止主循环从套接字获取信息或响应到达这些套接字的数据运行协程。
因此,你应该等待 asyncio.sleep(),它本质上相当于 JavaScript 的 setTimeout(),它告诉主循环,在某个时间内它应唤醒此协程并继续运行它。
建议阅读:

4

这很可能是一种不好的做法,因为time.sleep()会阻塞所有内容,而你只想阻塞特定的协程(我猜测)。

你正在异步世界中进行同步操作。

以下模式如何?

async def download():
    downloader = DataManager()
    downloader.start_multiple_docker_containers()
    while True:
        downloader.check_containers_statuses()
        await syncio.sleep(N)

它运行良好。不知道为什么,但在昨晚,像这样的代码:async def some1(): await some2(); async def some2(): await someN()看起来不太美观。 谢谢! - A.Smith

0

我对asyncio还很陌生,但是如果你像这样运行同步代码:

f = app.loop.run_in_executor(None, your_sync_function, app,param1,param2,...)

那么your_sync_function会在一个单独的线程中运行,你可以使用time.sleep()而不会影响asyncio循环。它会阻塞循环执行器的线程,但不会阻塞asyncio线程。至少,这是它看起来所做的。

如果你想从your_sync_function发送消息回到asyncio的循环,请查看janus库。

更多提示:

https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接