使用多进程模块结束守护进程

4
我在下面提供了一个多进程使用示例。这是一个进程池模型。虽然结构与我实际使用的代码相对接近,但并不像可能那么简单。它还使用了sqlalchemy,抱歉。
我的问题是 - 我目前有一个相对长时间运行的Python脚本,它执行了许多类似下面代码的函数,因此父进程在所有情况下都是相同的。换句话说,一个Python脚本创建了多个进程池。(我想我没有必须这样做,但另一种选择是使用类似于os.system和subprocess的东西。)问题是这些进程会停留在内存中。文档说这些守护进程应该一直保持到父进程退出,但如果父进程继续生成另一个池或进程并且不立即退出呢?
调用terminate()可以解决问题,但这看起来并不太礼貌。有没有一种好的方法可以请求进程优雅地终止?即自己清理并现在离开,我需要启动下一个进程池吗?
我还尝试在进程上调用join()。根据文档,这意味着等待进程终止。如果它们不打算终止怎么办?实际发生的是进程挂起。
提前感谢您的回答。
敬礼,Faheem。
import multiprocessing, time

class Worker(multiprocessing.Process):
    """Process executing tasks from a given tasks queue"""
    def __init__(self, queue, num):
        multiprocessing.Process.__init__(self)
        self.num = num
        self.queue = queue
        self.daemon = True

    def run(self):
        import traceback
        while True:
            func, args, kargs = self.queue.get()
            try:
                print "trying %s with args %s"%(func.__name__, args)
                func(*args, **kargs)
            except:
                traceback.print_exc()
            self.queue.task_done()

class ProcessPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.queue = multiprocessing.JoinableQueue()
        self.workerlist = []
        self.num = num_threads
        for i in range(num_threads):
            self.workerlist.append(Worker(self.queue, i))

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.queue.put((func, args, kargs))

    def start(self):
        for w in self.workerlist:
            w.start()

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.queue.join()
        for worker in self.workerlist:
            print worker.__dict__
            #worker.terminate()        <--- terminate used here  
            worker.join()              <--- join used here

start = time.time()

from sqlalchemy import *
from sqlalchemy.orm import *

dbuser = ''
password = ''
dbname = ''
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()

for i in range(10):
    make_foo(i)

m.create_all()

def do(i, dbstring):
    dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
    db = create_engine(dbstring, echo=True)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i)
    Session.commit()

pool = ProcessPool(5)
for i in range(10):
    pool.add_task(do, i, dbstring)
pool.start()
pool.wait_completion()
2个回答

3

我的解决方法是:

import multiprocessing

for prc in multiprocessing.active_children():
    prc.terminate()

我更喜欢这个方法,因为我不需要在工作函数中添加一些if语句而导致代码混乱。


2
你已经知道了multiprocessing已经有了工作进程的类,对吧?
标准的方法是向你的线程发送退出信号:
queue.put(("QUIT", None, None))

然后检查它:

if func == "QUIT":
    return

嗨,托马斯。感谢您的帮助回复。是的,我曾经考虑过使用multiprocess.Pool。也许这比自己制作的解决方案更好。如果您认为是这种情况,请评论一下。嗯,map_async看起来像是一个好选择。我可以这样做pool.map_async(do,range(10),callback=results.append)。虽然我想传递多个参数。谢谢您的队列放置建议。我正在查看信号和管道,我想这是不必要的复杂性。顺便说一下,有趣的博客。问候,法欣。 - Faheem Mitha
不客气。一般来说,除非有某些原因它不能满足你的需求,否则我总是会选择现成的解决方案。任何类型的“map”函数只能传递一个参数,但是有办法绕过这个限制。 - Thomas K
好的,最简单的方法是将所有参数都封装到一个参数中,例如使用字典。我不太清楚apply_async和map_async之间的优势,但我想一个结果对象比多个更好,所以我猜我会使用map_async。谢谢。 - Faheem Mitha
@Faheem:这是一种方法,另一种方法是使用zip()将它们连接在一起。或者如果只有一个参数在变化,你可以将其他参数分别传递给函数(尽管使用直接池可能会更棘手)。 - Thomas K

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接