类似于 multiprocessing Pool 的线程池?

419

是否有一个类似于多进程模块 Pool class 的工作线程池类?

例如,我喜欢并行化映射函数的简单方法。

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

然而,我希望在不创建新进程的情况下完成它。

我知道GIL的存在。但是,在我的用例中,该函数将是一个IO绑定的C函数,在实际函数调用之前,Python包装器将释放GIL。

我需要编写自己的线程池吗?


这里有一个看起来很有前途的东西,它在Python Cookbook中:Recipe 576519: Thread pool with same API as (multi)processing.Pool (Python) - otherchirps
3
现在它已经内置了:from multiprocessing.pool import ThreadPool - martineau
你能详细说明一下这句话的意思吗?“我知道GIL的存在。但是在我的使用场景中,该函数将是一个IO绑定的C函数,Python包装器将在实际函数调用之前释放GIL。” - mrgloom
请返回翻译文本。 - Darklighter
11个回答

544

我刚刚发现在multiprocessing模块中实际上有一个基于线程的池接口,但它有些隐藏且文档不够完善。

可以通过以下方式导入:

from multiprocessing.pool import ThreadPool

这是使用一个包装Python线程的虚拟Process类实现的。这个基于线程的Process类可以在multiprocessing.dummy中找到,docs中简要提到了它。这个虚拟模块据说是基于线程提供了整个多进程接口。


5
太棒了。我在创建非主线程的线程池时遇到了问题,不过一旦创建后,你可以在子线程中使用它们。我已经提交了一个问题:http://bugs.python.org/issue10015 - Olson
96
我不明白为什么这个类没有文档。如今,这种辅助类非常重要。 - Wernight
20
主要原因是没有人提供一个类似于threading.ThreadPool的补丁程序(或类似内容),并包含文档和测试,因此它没有公开。将它作为标准库中的好功能是很不错的,但如果没有人撰写相关程序,这是不可能实现的。现有的multiprocessing实现的一个好处是,它可以让任何类似于线程池的补丁程序变得更加容易编写。 - ncoghlan
4
multiprocessing.dummy.Pool/multiprocessing.pool.ThreadPool是同一事物,它们都是线程池。它们模拟了进程池的接口,但完全基于线程实现。请重新阅读文档,你搞反了。 - ShadowRanger
11
multiprocessing.dummy 是一个围绕着 threading 模块的简单封装,它复制了 multiprocessing 的 API。一般来说,multiprocessing 是关于进程的,但为了允许在进程和线程之间进行切换,他们使用了 multiprocessing.dummy 在后台支持线程来(大多数情况下)复制 multiprocessing 的 API。这样做的目的是使您可以通过 import multiprocessing.dummy as multiprocessing 将基于进程的代码更改为基于线程的代码。 - ShadowRanger
显示剩余11条评论

279

6
为了使用后移的futures模块,请运行“sudo pip install futures”。 - yair
这是多进程中最高效且最快速的方法。 - Haritsinh Gohil
14
使用 ThreadPoolExecutormultiprocessing.dummy.Pool 有什么区别? - Jay
3
concurrent.futures 在 Python 3.9 / 3.10 初期是一个非常有问题的库。看起来它被许多没有得到适当修复的 bug 所困扰。也许,整个这个库的前提就是错误的。我更熟悉这个库中基于进程的部分,在那里池子永远会挂起、吞噬错误并以其他方式表现不良,原因不胜枚举。我会尽可能远离这个库。 - wvxvw
这里描述了concurrent.futures的一些陷阱和限制:https://github.com/yeraydiazdiaz/futureproof#readme - Justas
对于那些对ThreadPoolExecutorThreadPool之间的区别感兴趣的人,最近的一个SO答案有一个很好的解释:https://dev59.com/5ngPtIcB2Jgan1zna6WC#73351707 - Karthic Raghupathi

74

是的,而且它似乎有(多多少少)相同的API。

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....

14
ThreadPool的导入路径与Pool不同,正确的导入方式是from multiprocessing.pool import ThreadPool - Marigold
2
奇怪的是,这不是一个记录在案的API,而multiprocessing.pool只是简单地提供了AsyncResult。但它在2.x和3.x中都可用。 - Marvin
3
这正是我在寻找的。只需要添加一行导入代码和对现有池代码进行微小修改即可完美运行。 - Danegraphics

51

这是一个非常简单且轻量级的实现(稍作修改自这里):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

要支持任务完成后的回调,只需将回调函数添加到任务元组中即可。


如果线程无条件地进入无限循环,它们如何加入? - Joseph Garvin
@JosephGarvin 我已经测试过了,线程会一直阻塞在空队列上(因为调用Queue.get()是阻塞的),直到程序结束,然后它们会自动终止。 - forumulator
@JosephGarvin,好问题。Queue.join()实际上会加入任务队列,而不是工作线程。因此,当队列为空时,wait_completion返回,程序结束,并由操作系统回收线程。 - randomir
如果将所有这些代码整理到一个整齐的函数中,即使队列为空并且 pool.wait_completion() 返回,它似乎也无法停止线程。结果是线程继续增加。 - ubiquibacon

26

你可以使用以下的Python库来使用线程池:

from multiprocessing.dummy import Pool as ThreadPool

然后,要使用此库,请按照以下方式操作:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

线程数指您需要的线程数量,任务是一个任务列表,大多数任务必须映射到服务。


谢谢,这是一个很好的建议!从文档中可以看出:multiprocessing.dummy 复制了 multiprocessing 的 API,但实际上只是 threading 模块的包装器。有一个更正 - 我认为你想说的是 pool api 是 (function,iterable)。 - layser
2
我们错过了 .close().join() 的调用,这导致 .map() 在所有线程完成之前就结束了。只是一个警告。 - Anatoly Scherbakov
非常棒的解决方案,非常优雅! - Lorenzo Bassetti

17

是的,有一个类似于多进程池的线程池,但是它有些隐藏并且没有很好地记录文档。你可以通过以下方式导入它:

from multiprocessing.pool import ThreadPool

我只是展示一个简单的例子

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result) 

1
我认为这应该成为被接受的答案。 - maxywb

14

这是我最终使用的结果。它是上面dgorissen类的修改版本。

文件:threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)
使用游泳池
from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()

其他读者请注意:该代码是使用 Python 3 编写的(shebang #!/usr/bin/python3)。 - Daniel Marschall
为什么要使用 for i, d in enumerate(delays): 然后忽略 i 值? - martineau
@martineau - 可能只是开发过程中的遗留物,他们可能想在运行期间打印 i - n1k31t4
为什么要使用 create_task?它的作用是什么? - MrR
1
我无法相信在 Stack Overflow 上获得 4 票的答案就是在 Python 中执行 ThreadPooling 的正确方式。官方 Python 发行版中的 Threadpool 仍然有问题吗?我错过了什么? - MrR

7

另一种方法可以将该进程添加到线程队列池中。

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

4

创建新进程的开销很小,特别是只有4个进程时。我怀疑这不是您应用程序的性能热点。保持简单,优化必要的地方以及性能分析结果指向的地方。


5
如果提问者使用的是Windows操作系统(我不确定他是否明确说明了),那么我认为进程启动可能会成为一个显著的开销。至少在我最近从事的项目中是这样的。 :-) - Brandon Rhodes

3

Python中没有内置的基于线程池的功能。但是,使用Queue类可以非常快速地实现一个生产者/消费者队列。

参考: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

3
使用concurrent.futures模块就不再是这种情况了。 - Thanatos
12
我认为这个说法已经不再正确了。 from multiprocessing.pool import ThreadPool - Randall Hunt
1
multiprocessing.pool.ThreadPool没有文档,因为它的实现从未完成。它缺乏测试和文档。 - MrR

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接