Python中的视频帧多进程处理

5
我对Python中的多进程编程还不熟悉。我想从每个小时长的视频文件的每一帧中提取特征。处理每一帧需要大约30毫秒。因为每一帧都是独立处理的,所以我认为使用多进程是一个好主意。
我想将特征提取的结果存储在一个自定义类中。
我阅读了一些示例代码,并按照这里的建议使用了多进程和队列。但结果令人失望,现在每帧处理时间约为1000毫秒。我猜测我产生了大量的开销。
有更有效的方法可以并行处理帧并收集结果吗?
为了说明问题,我准备了一个虚拟示例。
import multiprocessing as mp
from multiprocessing import Process, Queue
import numpy as np
import cv2

def main():
    #path='path\to\some\video.avi'
    coordinates=np.random.random((1000,2))
    #video = cv2.VideoCapture(path)
    listOf_FuncAndArgLists=[]

    for i in range(50):
        #video.set(cv2.CAP_PROP_POS_FRAMES,i)
        #img_frame_original = video.read()[1]
        #img_frame_original=cv2.cvtColor(img_frame_original, cv2.COLOR_BGR2GRAY)
        img_frame_dummy=np.random.random((300,300)) #using dummy image for this example
        frame_coordinates=coordinates[i,:]
        listOf_FuncAndArgLists.append([parallel_function,frame_coordinates,i,img_frame_dummy])

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
    for job in jobs: job.start() # Launch them all
    for job in jobs: job.join() # Wait for them all to finish
    # And now, collect all the outputs:
    return([queue.get() for queue in queues])         

def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
    print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
    que.put(fff(*theArgs)) #we're putting return value into queue

def parallel_function(frame_coordinates,i,img_frame_original):
    #do some image processing that takes about 20-30 ms
    dummyResult=np.argmax(img_frame_original)
    return(resultClass(dummyResult,i))

class resultClass(object):
    def __init__(self,maxIntensity,i):
        self.maxIntensity=maxIntensity
        self.i=i

if __name__ == '__main__':
    mp.freeze_support()
    a=main()
    [x.maxIntensity for x in a]
2个回答

2
在(普通的)Python中进行并行处理有些麻烦:在其他语言中,我们只需使用线程,但全局解释器锁使得这很棘手,而使用多进程在数据传输方面存在很大的开销。我发现细粒度的并行性相对难以实现,而在单个进程中处理需要10秒或更长时间的工作“块”可能会更加直观。
如果您正在UNIX系统上,更简便的并行处理路径将是创建一个Python程序,该程序处理命令行指定的视频段(即要开始的帧编号和要处理的帧数),然后使用GNU parallel工具同时处理多个段。第二个Python程序可以从一组文件中整合结果,或从stdin读取,通过parallel管道传输。这种方式意味着处理代码无需自己实现并行处理,但需要多次访问输入文件并从中间点提取帧。(这也可以扩展到跨多台机器工作而无需改变Python代码...)
使用 multiprocessing.Pool.map 可以以类似的方式使用,如果您需要一个纯 Python 的解决方案:对元组列表(例如 (file, startframe, endframe))进行映射,然后在函数中打开文件并处理该片段。

我正在使用Windows系统。我想尝试分块处理,但是在管理多个进程对视频文件的访问时遇到了困难。仅仅让每个进程通过openCV尝试读取帧会导致Python挂起。 - jlarsch
1
我建议逐步测试。编写一个脚本来处理视频的一部分并检查其是否正常工作(不要在任何地方使用多进程)。然后尝试同时运行两次该脚本(即使它们正在处理文件的相同部分),以检查多个进程从输入文件读取时是否存在问题。除非有东西锁定了文件或以其他方式防止对其进行读取访问,否则应该是可以的,但值得在隔离环境中检查。最后,扩展脚本以使用多进程。这应该有助于确定它挂起的位置和原因! - Wuggy
我刚刚发现我可以同时从多个进程中读取同一个视频文件。问题似乎在于将过大的变量传递到队列中会导致我的Python程序冻结。我想知道是否有参数可以更改队列大小?或者有没有其他方法来返回值? - jlarsch
看起来解决那个冻结的方法是在主函数中省略上面的join命令,正如文档中“使用队列连接进程”部分所指出的那样:https://docs.python.org/2/library/multiprocessing.html#programming-guidelines - jlarsch
不错的发现!我尽量避免在多进程中使用低级别的东西,而是尝试使用multiprocessing.Pool.map...map_async方法。当我必须直接使用ProcessQueue时,我会制作自己的类似于Pool的类,并通过向工作进程发送特定的“终止”命令来处理清理。然后,在清理期间,Process.join()实际上是最后完成的事情,因为它们将已退出(或很快将退出)。 - Wuggy

1

多进程会增加启动多个进程和将它们全部汇总的开销。

你的代码对于每一帧都这样做。

尝试将视频分成N个大小相等的部分,并在并行处理它们

将N设置为您机器上的核心数量或类似的数字(您的情况可能有所不同,但这是一个很好的开始实验的数字)。如果说有50个进程被创建,但只有4个在执行,其余的则只是等待它们的轮到而已,那么这样做就没有意义。


我的问题是这个建议需要读取每个进程中的帧,这会导致我的Python崩溃。我认为这之前已经有人描述过了:https://dev59.com/FYXca4cB1Zd3GeqPGlKN?rq=1 - jlarsch
读取尽可能多的帧数,以主进程为例,然后将这些帧分成N个块并并行处理,反复执行此过程如何? - Daerdemandt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接