多进程池映射和带两个参数函数

10

我正在使用 multiprocessing.Pool()

这是我想要使用池的内容:

def insert_and_process(file_to_process,db):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,file_list,db) # here having problem.
我想传递2个参数。我想做的是仅初始化4个数据库连接(在每次函数调用时尝试创建连接,可能会有数百万次调用,并导致IO死锁)。如果我可以为每个进程创建4个数据库连接之一,那就可以了。 是否有关于池的解决方案?还是我应该放弃它? 编辑: 通过你们两个人的帮助,我得到了这个:
args=zip(f,cycle(dbs))
Out[-]: 
[('f1', 'db1'),
 ('f2', 'db2'),
 ('f3', 'db3'),
 ('f4', 'db4'),
 ('f5', 'db1'),
 ('f6', 'db2'),
 ('f7', 'db3'),
 ('f8', 'db4'),
 ('f9', 'db1'),
 ('f10', 'db2'),
 ('f11', 'db3'),
 ('f12', 'db4')]

那么这就是它的工作方式,我将把DB连接代码移到主级别并执行以下操作:

def process_and_insert(args):

    #Table Definations
    args[1].table.insert(**parse_file(args[0]))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
    args=zip(file_list,cycle(dbs))
    P.map(insert_and_process,args) # here having problem.

好的,我会测试它并告诉你们结果。

5个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
29

Pool文档没有说明如何将多个参数传递给目标函数,我尝试了将参数作为序列进行传递,但是并没有展开(即序列的每个项目都分别作为一个参数)。

不过,你可以编写目标函数来期望第一个(也是唯一的)参数是一个元组,在该元组中,每个元素都是你所期望的参数之一:

from itertools import repeat

def insert_and_process((file_to_process,db)):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,zip(file_list,repeat(db))) 

(注意在insert_and_process的定义中有额外的括号 - Python将其视为一个应该是2项序列的单个参数。序列的第一个元素归属于第一个变量,而另一个则属于第二个)


4
请注意,Python 3 中已不再支持 def f((arg1, arg2)): 这种语法。 - Ferdinand Beyer
1
@FerdinandBeyer:我忘记了。除非multiprocessing.Pool.map的实现不同,否则最好的方法是将其分配给单个参数,然后在函数内部进行解包。 - jsbueno
谢谢,我搞定了!我通过zip(file_list,cycle(dbs))实现了它。但是我没有使用f((arg1,arg2))。随着我使用了更多你的代码,我选择了你! - Phyo Arkar Lwin

8

您的进程池将会生成四个进程,每个进程都由自己的Python解释器实例运行。您可以使用全局变量来保存数据库连接对象,以便每个进程仅创建一个连接:

global_db = None

def insert_and_process(file_to_process, db):
    global global_db
    if global_db is None:
        # If this is the first time this function is called within this
        # process, create a new connection.  Otherwise, the global variable
        # already holds a connection established by a former call.
        global_db = DAL("path_to_mysql" + db)
    global_db.table.insert(**parse_file(file_to_process))
    return True

由于Pool.map()等方法只支持一个参数的工作函数,您需要创建一个包装器来转发工作:

def insert_and_process_helper(args):
    return insert_and_process(*args)

if __name__ == "__main__":
    file_list=os.listdir(".")
    db = "wherever you get your db"
    # Create argument tuples for each function call:
    jobs = [(file, db) for file in file_list]
    P = Pool(processes=4)
    P.map(insert_and_process_helper, jobs)

谢谢Ferdinand,这已经接近我想要的了。我想做的是创建4个数据库连接。每个进程一个连接,而不是每个函数调用一个连接。DAL("Path To db")将创建一个数据库连接。单个连接比同时使用四个连接慢。 - Phyo Arkar Lwin
我尝试过这些例子,并且当函数不需要返回时,它们都能正常工作...我们不能像这样做吗:my_var=P.map(insert_and_process_helper, jobs)? - neverMind
如果我将第二个参数设置为列表或集合,它会起作用吗? - Ishan Bhatt

5

不需要使用zip。例如,如果您有2个参数x和y,并且它们各自可以获得多个值,如:

X=range(1,6)
Y=range(10)
该函数应该只接收一个参数,并在内部进行解包:
def func(params):
    (x,y)=params
    ...

你可以这样调用:

params = [(x,y) for x in X for y in Y]
pool.map(func, params)

3
你可以使用

标签

from functools import partial 

为此目的而设计的库

例如

func = partial(rdc, lat, lng)
r = pool.map(func, range(8))

并且

def rdc(lat,lng,x):
    pass 

2

使用

params=[(x,y) for x in X for y in Y]

你需要创建 xy 的完整副本,这可能比使用其他方法更慢。

from itertools import repeat
P.map(insert_and_process,zip(file_list,repeat(db)))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,