如何在Python3的concurrent.futures中检测异常?

18

由于其并发 futures 模块,我刚刚开始使用 Python3 。我想知道能否让它检测错误。如果有更高效的模块,请告诉我,因为我想使用并发 futures 进行并行编程。

我不喜欢 multiprocessing,因为它过于复杂,而且没有太多的文档说明。但是,如果有人可以使用 multiprocessing 写一个仅使用函数而非类来进行并行计算的 Hello World 程序,那将很好理解。

这是一个简单的脚本:

from concurrent.futures import ThreadPoolExecutor

def pri():
    print("Hello World!!!")

def start():
    try:
        while True:
            pri()
    except KeyBoardInterrupt:
        print("YOU PRESSED CTRL+C")


with ThreadPoolExecutor(max_workers=3) as exe:
    exe.submit(start)

以上代码只是演示如何使用CTRL+C无法打印语句。

我想要的是,在出现错误时能够调用一个函数。这个错误检测必须来自于函数本身。

另一个例子

import socket
from concurrent.futures import ThreadPoolExecutor 
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
def con():
    try:
        s.connect((x,y))
        main()
    except: socket.gaierror
         err()
def err():
    time.sleep(1)
    con()
def main():
    s.send("[+] Hello")
with ThreadPoolExecutor as exe:
    exe.submit(con)

1
所以你想捕获 KeyBoardInterrupt?这是你在问什么吗? - laike9m
那么你有什么问题? - laike9m
你想要能够打印“你按下了Ctrl+C键”吗? - laike9m
让我们在聊天中继续这个讨论:点击此处 - user5327424
检测concurrent.futures中失败的任务 - Ciro Santilli OurBigBook.com
2个回答

20

很晚才加入这场聚会,但也许对其他人有帮助...

我相信原问题并没有得到真正的回答。大家卡在了user5327424使用键盘中断来触发异常的事实上,而实际关键是无论由何种原因引起的异常都没有被触发。例如:

import concurrent.futures


def main():
    numbers = range(10)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = {executor.submit(raise_my_exception, number): number for number in numbers}


def raise_my_exception(number):
    print('Proof that this function is getting called. %s' % number)
    raise Exception('This never sees the light of day...')


main()
当上述示例代码被执行时,你会在屏幕上看到打印语句内的文本,但是你永远不会看到异常。这是因为每个线程的结果都保存在results对象中。你需要迭代该对象才能访问你的异常。下面的示例演示了如何访问结果。
import concurrent.futures


def main():
    numbers = range(10)

    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = {executor.submit(raise_my_exception, number): number for number in numbers}

    for result in results:
        # This will cause the exception to be raised (but only the first one)
        print(result.result())


def raise_my_exception(number):
    print('Proof that this function is getting called. %s' % number)
    raise Exception('This will be raised once the results are iterated.')


main()

我不确定我是否喜欢这种行为,但它确实允许线程完全执行,而不管单个线程内发生的异常。


4

这里有一个解决方案。我不确定你是否喜欢,但我想不到其他的方法了。我已经修改了你的代码使其可用。

from concurrent.futures import ThreadPoolExecutor
import time

quit = False

def pri():
    print("Hello World!!!")

def start():
    while quit is not True:
        time.sleep(1)
        pri()

try:
    pool = ThreadPoolExecutor(max_workers=3)
    pool.submit(start)

    while quit is not True:
        print("hei")
        time.sleep(1)
except KeyboardInterrupt:
    quit = True

以下是要点:
  1. When you use with ThreadPoolExecutor(max_workers=3) as exe, it waits until all tasks have been done. Have a look at Doc

    If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed. If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.

    You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)

    It's like calling join() on a thread.
    That's why I replaced it with:

    pool = ThreadPoolExecutor(max_workers=3)
    pool.submit(start)
    
  2. Main thread must be doing "work" to be able to catch a Ctrl+C. So you can't just leave main thread there and exit, the simplest way is to run an infinite loop

  3. Now that you have a loop running in main thread, when you hit CTRL+C, program will enter the except KeyboardInterrupt block and set quit=True. Then your worker thread can exit.

严格来说,这只是一个解决方法。在我看来,似乎没有其他的方法可以解决这个问题。

编辑
我不确定你有什么困扰,但你可以在另一个线程中捕获异常而没有问题:

import socket
import time
from concurrent.futures import ThreadPoolExecutor 
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)

def con():
    try:
        raise socket.gaierror
        main()
    except socket.gaierror:
        print("gaierror occurred")
        err()

def err():
    print("err invoked")
    time.sleep(1)
    con()

def main():
    s.send("[+] Hello")

with ThreadPoolExecutor(3) as exe:
    exe.submit(con)

输出

gaierror occurred
err invoked
gaierror occurred
err invoked
gaierror occurred
err invoked
gaierror occurred
...

对我来说不起作用,它实际上没有连接并进入main()函数。 - user5327424
@IsithaSubasinghe 我明白了。我没有编写可以实际使用的代码,只是为了证明我的观点。我在 main 之前引发了一个异常,当然你无法到达 main - laike9m
它只是跳过了 connect((x,y)),但输出结果相同。 - user5327424
这是我的真正问题:http://stackoverflow.com/questions/33513017/parallel-compute-task-to-brute-force-in-python。我发布了上述内容,以便其他人可以从中受益,因为Python并行性很麻烦。 - user5327424
@IsithaSubasinghe,没有愚蠢的问题,你真的不需要道歉。 - laike9m
显示剩余9条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接