如何在Python的concurrent.futures中打破time.sleep()函数

9

我正在尝试使用concurrent.futures

目前我的future调用了time.sleep(secs)

似乎Future.cancel()的作用不及我所想。

如果future已经在执行,那么time.sleep()将无法被取消。

wait()的超时参数同样无法取消time.sleep()

如何取消并发执行的time.sleep()

我使用ThreadPoolExecutor进行测试。


短回答 - 没门,大多数情况下,在工作线程中使用sleep意味着设计有问题。长回答 - 你总是能够实现自定义的sleep并且可以打破它们,但这既不符合Python风格也不正确。作为替代方案,您可以检查锁的使用。 - Reishin
5个回答

7
如果您向ThreadPoolExecutor提交一个函数,执行器将在线程中运行该函数,并将其返回值存储在Future对象中。由于并发线程的数量是有限的,因此您可以选择取消未来的挂起执行,但一旦控制权在工作线程中传递给可调用项,就无法停止执行。
考虑以下代码:
import concurrent.futures as f
import time

T = f.ThreadPoolExecutor(1) # Run at most one function concurrently
def block5():
    time.sleep(5)
    return 1
q = T.submit(block5)
m = T.submit(block5)

print q.cancel()  # Will fail, because q is already running
print m.cancel()  # Will work, because q is blocking the only thread, so m is still queued

一般来说,每当您想要一个可取消的东西时,您自己负责确保它是可取消的。但也有一些现成的选择可供使用。例如,考虑使用asyncio,他们还有一个使用sleep的示例。该概念通过在调用任何可能阻塞的操作时,将控制返回给运行在最外层上下文中的控制循环,并附带一条指示在结果可用后继续执行的注释 - 或者,在您的情况下,经过n秒后继续执行。

哦,多么有趣 :-) 我从 multiprocessing 切换到 concurrent.futures(出于其他原因)。现在我正在考虑从 concurrent.futures 切换到 asyncio ... :-). 尽管如此,Phillip,谢谢你的回答! - guettli
不客气。顺便说一下,使用multiprocessing,可以中断sleep,因为你当然可以“杀死”其他进程。 - Phillip
据我所知,Linux会为每个新进程增加PID,如果达到上限则循环。这种情况发生的可能性非常小。但你是对的:这是一种竞态条件。 - guettli
在这方面,多进程和concurrent.futures有什么不同?据我所知,两者都使用进程池。 - guettli
通过使用多进程,您可以直接使用“Process”类来插入自己的保护措施。当然,您也可以子类化“ProcessPoolExecutor”并添加安全终止功能。 - Phillip
显示剩余9条评论

2

我不太了解concurrent.futures,但是你可以使用这个逻辑来控制时间。使用循环而不是sleep.time()或wait()。

for i in range(sec):
    sleep(1)

可以使用中断或者break语句来跳出循环。


2
是的,这可能有效。感觉像芬兰的一个人只想通过拨号连接读取他的邮件...嗯,我需要一个事件循环....我需要一个调度程序....最后它就是一个操作系统。 - guettli

1
我已经明白了。
以下是一个例子:
from concurrent.futures import ThreadPoolExecutor
import queue
import time

class Runner:
    def __init__(self):
        self.q = queue.Queue()
        self.exec = ThreadPoolExecutor(max_workers=2)

    def task(self):
        while True:
            try:
                self.q.get(block=True, timeout=1)
                break
            except queue.Empty:
                pass
            print('running')

    def run(self):
        self.exec.submit(self.task)

    def stop(self):
        self.q.put(None)
        self.exec.shutdown(wait=False,cancel_futures=True)

r = Runner()
r.run()
time.sleep(5)
r.stop()

0

我最近也遇到了这个问题。我有两个任务需要同时运行,其中一个任务需要不时地休眠。在下面的代码中,假设task2是需要休眠的任务。

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=2)
executor.submit(task1)
executor.submit(task2)

executor.shutdown(wait=True)

为了避免无休止的休眠,我将task2提取出来以同步方式运行。我不知道这是否是一个好的实践,但在我的场景中它简单且完美适用。
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(task1)

task2()

executor.shutdown(wait=True)

也许对其他人有用。

-1
正如 链接 中所写的那样,您可以使用 with 语句以确保线程及时清理,就像下面的示例一样:
import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

请您解释一下时间休眠部分的内容,谢谢。 - B. Okba

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接