asyncio:是否可以取消由Executor运行的future?

33

我想使用异步调用的loop.run_in_executor方法在Executor中启动一个阻塞函数,然后稍后取消它,但这似乎对我不起作用。

以下是代码:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

我期望上述代码只允许阻塞函数输出:

blocking 0/5
blocking 1/5

然后查看非阻塞函数的输出,但是实际上阻塞的future仍会继续执行,即使我已经取消了。

这种情况可能吗?还有其他方法可以解决吗?

谢谢

编辑:关于使用asyncio运行阻塞和非阻塞代码的更多讨论:如何使用asyncio与阻塞和非阻塞代码进行交互

2个回答

32
在这种情况下,一旦Future实际开始运行,就没有办法取消它,因为您依赖于concurrent.futures.Future的行为,而且其文档如下所述: cancel() 尝试取消调用。如果正在执行调用且无法取消,则方法将返回False,否则将取消调用并返回True
因此,唯一成功取消的时机是任务仍处于Executor中等待。现在,您实际上使用的是一个封装在concurrent.futures.Future周围的asyncio.Future,在实践中,loop.run_in_executor()返回的asyncio.Future如果您尝试在调用cancel()后使用yield from,即使基础任务实际上已经在运行,也会引发CancellationError。但是,它不会真正取消Executor内部任务的执行。
如果您需要真正取消任务,您需要使用更常规的方法来中断线程中正在运行的任务。如何执行此操作取决于用例。对于您在示例中提出的用例,您可以使用threading.Event
def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func

谢谢您的回复。您提出的解决方案听起来很适合解决这个问题。我的问题是,在我的实际情况中,blocking_func是第三方提供的。我会等一两天看看是否有其他建议;否则我将接受您的答案。 - Brendan Maguire
你知道如何在blocking_func完成时添加回调吗? - luisgepeto
@dano,我尝试了你取消这段代码的方法,但它似乎卡住了。我在这里提出了一个相关问题:https://dev59.com/j734oIgBc1ULPQZF1XTx - Ja8zyjits

2

由于线程共享进程的内存地址空间,因此没有安全的方法来终止正在运行的线程。这就是为什么大多数编程语言不允许杀死正在运行的线程的原因(有很多丑陋的黑客手段可以规避这个限制)。

Java 通过 艰难的方式 学会了这一点。

一个解决方案是将您的函数在单独的进程中运行,而不是在线程中,并优雅地终止它。

Pebble 库提供了类似于 concurrent.futures 的接口,支持取消运行的 Futures

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接