Python 无法启动新线程多进程处理

5

我正在尝试使用一组计算机集群来运行数百万个小型模拟。为此,我尝试在我的主要计算机上设置了两个“服务器”,一个用于将输入变量添加到网络中的队列中,另一个用于处理结果。

以下是将内容放入模拟变量队列的代码:

"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


class MultiComputers(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(MultiComputers, self).__init__()

    def get_sim_obj(self, offset, db):
        """returns a list of lists from a database query"""

    def handle_queue(self):
        self.sim_nr = 0
        sims = self.get_sim_obj()
        self.total = len(sims)
        while len(sims) > 0:
            if self.queue.qsize() > 100:
                self.queue.put(sims[0])
                self.sim_nr += 1
                print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
                del sims[0]

    def run(self):
        self.handle_queue()

if __name__ == '__main__':
    freeze_support()
    queue = Queue()
    w = MultiComputers('seed_1_hundred', queue)
    w.start()
    QueueManager.register('get_queue', callable=lambda: queue)
    m = QueueManager(address=('', 8001), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

接下来,这个队列被运行以处理模拟结果:

__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time


class QueueManager(BaseManager):
    pass


class SaveFromMultiComp(Process):
    def __init__(self, sim_name, queue):
        self.sim_name = sim_name
        self.queue = queue
        super(SaveFromMultiComp, self).__init__()

    def run(self):
        res_got = 0
        with open('sim_type1_' + self.sim_name, 'a') as f_1:
            with open('sim_type2_' + self.sim_name, 'a') as f_2:
                while True:
                    if self.queue.qsize() > 0:
                        while self.queue.qsize() > 0:
                            res = self.queue.get()
                            res_got += 1
                            if res[0] == 1:
                                f_1.write(str(res[1]) + '\n')
                            elif res[0] == 2:
                                f_2.write(str(res[1]) + '\n')
                            print(res_got)
                    time.sleep(0.5)


if __name__ == '__main__':
    queue = Queue()
    w = SaveFromMultiComp('seed_1_hundred', queue)
    w.start()
    m = QueueManager(address=('', 8002), authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

这些脚本在处理前7-800个模拟时按预期工作,但在那之后,我在运行接收结果脚本的终端中遇到了以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Python35\lib\multiprocessing\managers.py", line 177, in accepter
    t.start()
  File "C:\Python35\lib\threading.py", line 844, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

有人能解释一下线程是在哪里和如何生成的吗?每次调用queue.get()时是否会生成新的线程,或者它是如何工作的? 如果有人知道我应该怎么做来避免这种失败,我将非常高兴。(我正在使用Python3.5-32运行脚本)


为什么您选择不在 with multiprocessing.Manager() as manager: 块中使用从调用 queue = manager.Queue() 返回的队列实例? - Booboo
你可以尝试使用连接池:https://dev59.com/y3A75IYBdhLWcg3w6Nr8#62396445 - fghoussen
你可以尝试使用连接池:https://dev59.com/y3A75IYBdhLWcg3w6Nr8#62396445 - fghoussen
2个回答

4
所有的迹象都表明您的系统缺少启动线程所需的资源(可能是内存,但也可能是泄漏的线程或其他资源)。您可以使用操作系统监控工具(Linux上的top,Windows上的Resource Monitor)查看线程数量和内存使用情况来跟踪此问题,但我建议您使用更简单、更高效的编程模式。
虽然不完全相同,但通常会出现C10K问题,它指出等待结果的阻塞线程无法很好地扩展,并且容易出现泄漏错误,如此类问题。解决方案是实现异步IO模式(一个阻塞线程启动其他工作线程),这在Web服务器中非常简单。
像Python的aiohttp这样的框架应该非常适合您的要求。您只需要一个处理程序,可以获取远程代码的ID和结果。希望该框架可以为您处理扩展问题。
因此,在您的情况下,您可以保留启动代码,但在远程机器上启动进程后,终止该线程。然后,让远程代码向您的服务器发送HTTP消息,其中包括1)其ID和2)其结果。再加入一些额外的代码,如果没有得到200个“OK”状态代码就请求重试,您应该可以有更好的解决方案。

0
我认为您的系统运行了太多线程。我建议您首先检查系统资源,然后重新考虑程序。 尝试限制您的线程数量,并尽可能使用较少的线程。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接