我在下面提供了一个多进程使用示例。这是一个进程池模型。虽然结构与我实际使用的代码相对接近,但并不像可能那么简单。它还使用了sqlalchemy,抱歉。
我的问题是 - 我目前有一个相对长时间运行的Python脚本,它执行了许多类似下面代码的函数,因此父进程在所有情况下都是相同的。换句话说,一个Python脚本创建了多个进程池。(我想我没有必须这样做,但另一种选择是使用类似于os.system和subprocess的东西。)问题是这些进程会停留在内存中。文档说这些守护进程应该一直保持到父进程退出,但如果父进程继续生成另一个池或进程并且不立即退出呢?
调用terminate()可以解决问题,但这看起来并不太礼貌。有没有一种好的方法可以请求进程优雅地终止?即自己清理并现在离开,我需要启动下一个进程池吗?
我还尝试在进程上调用join()。根据文档,这意味着等待进程终止。如果它们不打算终止怎么办?实际发生的是进程挂起。
提前感谢您的回答。
敬礼,Faheem。
我的问题是 - 我目前有一个相对长时间运行的Python脚本,它执行了许多类似下面代码的函数,因此父进程在所有情况下都是相同的。换句话说,一个Python脚本创建了多个进程池。(我想我没有必须这样做,但另一种选择是使用类似于os.system和subprocess的东西。)问题是这些进程会停留在内存中。文档说这些守护进程应该一直保持到父进程退出,但如果父进程继续生成另一个池或进程并且不立即退出呢?
调用terminate()可以解决问题,但这看起来并不太礼貌。有没有一种好的方法可以请求进程优雅地终止?即自己清理并现在离开,我需要启动下一个进程池吗?
我还尝试在进程上调用join()。根据文档,这意味着等待进程终止。如果它们不打算终止怎么办?实际发生的是进程挂起。
提前感谢您的回答。
敬礼,Faheem。
import multiprocessing, time
class Worker(multiprocessing.Process):
"""Process executing tasks from a given tasks queue"""
def __init__(self, queue, num):
multiprocessing.Process.__init__(self)
self.num = num
self.queue = queue
self.daemon = True
def run(self):
import traceback
while True:
func, args, kargs = self.queue.get()
try:
print "trying %s with args %s"%(func.__name__, args)
func(*args, **kargs)
except:
traceback.print_exc()
self.queue.task_done()
class ProcessPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.queue = multiprocessing.JoinableQueue()
self.workerlist = []
self.num = num_threads
for i in range(num_threads):
self.workerlist.append(Worker(self.queue, i))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.queue.put((func, args, kargs))
def start(self):
for w in self.workerlist:
w.start()
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.queue.join()
for worker in self.workerlist:
print worker.__dict__
#worker.terminate() <--- terminate used here
worker.join() <--- join used here
start = time.time()
from sqlalchemy import *
from sqlalchemy.orm import *
dbuser = ''
password = ''
dbname = ''
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
m = MetaData(db)
def make_foo(i):
t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))
conn = db.connect()
for i in range(10):
conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()
for i in range(10):
make_foo(i)
m.create_all()
def do(i, dbstring):
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
Session = scoped_session(sessionmaker())
Session.configure(bind=db)
Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i)
Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i)
Session.commit()
pool = ProcessPool(5)
for i in range(10):
pool.add_task(do, i, dbstring)
pool.start()
pool.wait_completion()
zip()
将它们连接在一起。或者如果只有一个参数在变化,你可以将其他参数分别传递给函数(尽管使用直接池可能会更棘手)。 - Thomas K