同时执行多个函数

52

我试图在Python中同时运行两个函数。我尝试了使用 multiprocessing 的以下代码,但当我执行代码时,第二个函数仅在第一个函数完成后才开始。

from multiprocessing import Process
def func1:
     #does something

def func2:
     #does something

if __name__=='__main__':
     p1 = Process(target = func1)
     p1.start()
     p2 = Process(target = func2)
     p2.start()

5
第一个是快速完成,您确定吗?此外,请确保这些进程确实是独立的,不会等待另一个进程正在使用的资源或另一个进程将产生的数据。 - user2357112
你读过这篇帖子吗?如何同时运行两个函数 - zionpi
@user2357112:第一个函数在我运行时需要大约2分钟才能执行,而且这两个函数是完全独立的。 - user2739601
可能是重复的问题: Python:如何并行运行Python函数? - OrangeDog
6个回答

70
你做得很正确。 :)
尝试运行这个愚蠢的代码片段:
from multiprocessing import Process
import sys

rocket = 0

def func1():
    global rocket
    print 'start func1'
    while rocket < sys.maxint:
        rocket += 1
    print 'end func1'

def func2():
    global rocket
    print 'start func2'
    while rocket < sys.maxint:
        rocket += 1
    print 'end func2'

if __name__=='__main__':
    p1 = Process(target = func1)
    p1.start()
    p2 = Process(target = func2)
    p2.start()

你会看到它打印出“start func1”,然后是“start func2”,然后经过(非常)长的一段时间,最终你会看到函数结束。但它们确实是同时执行的。

因为进程启动需要一段时间,所以你甚至可能会在“start func1”之前看到“start func2”


有人可以更新这个到Python 3吗? - Nic Scozzaro
在这段代码中是否需要关闭进程?我感觉使用多进程后内存没有释放。 - mahshid.r

34

这正是我所需要的。虽然没有人要求,但我修改了Shashank的代码以适用于Python 3,方便其他寻找解决方案的人 :)

from multiprocessing import Process
import sys

rocket = 0

def func1():
    global rocket
    print ('start func1')
    while rocket < sys.maxsize:
        rocket += 1
    print ('end func1')

def func2():
    global rocket
    print ('start func2')
    while rocket < sys.maxsize:
        rocket += 1
    print ('end func2')

if __name__=='__main__':
    p1 = Process(target=func1)
    p1.start()
    p2 = Process(target=func2)
    p2.start()

将sys.maxsize替换为一个数字,然后打印(rocket),您就可以看到它一次计数一次。达到一个数字并停止。


2
你有任何想法为什么这个代码完全没有作用吗? - CapnShanty
3
如何使用接收输入参数的函数实现这个? - hirschme
图书馆里可能有些问题。我一直收到以下错误消息:AttributeError: module 'threading' has no attribute '_shutdown'。 - RandallShanePhD
1
@hirschme 在这里跟进。https://docs.python.org/3/library/multiprocessing.html 您可以使用 Process(target=nameOfFunction, args=(arg1, arg2, )) - Rahul Bordoloi

11

使用Ray可以优雅地实现这一点,它是一个系统,可以轻松并行化和分发Python代码。

要并行化您的示例,您需要使用@ray.remote修饰符定义函数,然后使用.remote调用它们。

import ray

ray.init()

# Define functions you want to execute in parallel using 
# the ray.remote decorator.
@ray.remote
def func1():
    #does something

@ray.remote
def func2():
    #does something

# Execute func1 and func2 in parallel.
ray.get([func1.remote(), func2.remote()])

如果func1()func2()返回结果,您需要按照以下方式重写代码:
ret_id1 = func1.remote()
ret_id2 = func1.remote()
ret1, ret2 = ray.get([ret_id1, ret_id2])

使用Ray而非multiprocessing模块有许多优点。特别的,同样的代码可以在单台机器上和机器集群上运行。更多关于Ray的优点请参见这篇相关文章


2
很不幸,对于我来说,Windows 没有光线追踪分发。 - JediCate
这个库是一个天才之作。在这里查看演示:https://ray.readthedocs.io/en/latest/walkthrough.html。它减少了尝试配置池和监视资源的开销,并提供了使用可用CPU和GPU资源的自动化方法。强烈推荐! - RandallShanePhD
很不幸,它目前在类属性方面存在问题,因此 getter/setter 方法也有问题 :-( - leonard vertighel

8

这是@Shashank提供的一个非常好的例子。我只想说,我不得不在最后添加join,否则两个进程将无法同时运行:

from multiprocessing import Process
import sys

rocket = 0

def func1():
    global rocket
    print 'start func1'
    while rocket < sys.maxint:
        rocket += 1
    print 'end func1'

def func2():
    global rocket
    print 'start func2'
    while rocket < sys.maxint:
        rocket += 1
    print 'end func2'

if __name__=='__main__':
    p1 = Process(target = func1)
    p1.start()
    p2 = Process(target = func2)
    p2.start()
    # This is where I had to add the join() function.
    p1.join()
    p2.join()

此外,看看这个帖子: 何时在进程上调用.join()方法?

process.join 几乎是所有多进程示例脚本中缺失的部分,没有它,进程会按顺序运行,感谢指出这一点。 - nish
假设func1()需要一个输入参数。你能举个例子来演示如何传递参数吗? - Syed Md Ismail

4

如果需要运行动态进程列表,请看下面的另一种版本。我会提供两个shell脚本,你可以试试:

t1.sh

for i in {1..10}
  do 
     echo "1... t.sh i:"$i
     sleep 1
  done

t2.sh

   for i in {1..3}
   do
       echo "2.. t2.sh i:"$i
       sleep 1
   done

np.py

import os
from multiprocessing import Process, Lock

def f(l, cmd):
    os.system(cmd)

if __name__ == '__main__':
    lock = Lock()

    for cmd in ['sh t1.sh', 'sh t2.sh']:
        Process(target=f, args=(lock, cmd)).start()

输出

1... t.sh i:1
2.. t2.sh i:1
1... t.sh i:2
2.. t2.sh i:2
1... t.sh i:3
2.. t2.sh i:3
1... t.sh i:4
1... t.sh i:5
1... t.sh i:6
1... t.sh i:7
1... t.sh i:8
1... t.sh i:9
1... t.sh i:10

在任务 "l.acquire()" 之前可以获取 "lock",并在 "l.release()" 之后释放。


0
#Try by using threads instead of multiprocessing

#from multiprocessing import Process
#import sys

import time
import threading
import random

rocket = 0

def func1():
    global rocket
    print('start func1')
    while rocket < 100:
        print("Im in func1")
        rocket += 1
        value = "Im global var "+str(rocket)+" from fun1"
        print(value)

    print ('end func1')

def func2():
    global rocket
    print ('start func2')
    while rocket < 100:
        print("Im in func2")
        rocket += 1
        value = "Im global var " + str(rocket) + " from fun2"
        print(value)
    print ('end func2')

if __name__=='__main__':
    p1 = threading.Thread(target=func1)
    p2 = threading.Thread(target=func2)
    p1.start();p2.start()

#Hope it works

阅读如何撰写一个好的答案? - user13146129
线程和多进程是完全不同的东西。 - EMT

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接