多进程和队列结合DataFrame

3

我在使用队列实现两个进程之间的对象(数据帧)交换时遇到了一些问题。

第一个进程从队列中获取数据,第二个进程将数据放入队列中。由于放置数据的进程速度更快,所以获取数据的进程应该通过读取所有对象来清空队列。

我遇到了奇怪的行为,因为我的代码完美地按预期工作,但仅对于100行的数据帧而言,对于1000行的数据帧,获取数据的进程始终仅取得1个对象。

import multiprocessing, time, sys
import pandas as pd

NR_ROWS = 1000
i = 0
def getDf():
    global i, NR_ROWS
    myheader = ["name", "test2", "test3"]                
    myrow1 =   [ i,  i+400, i+250]
    df = pd.DataFrame([myrow1]*NR_ROWS, columns = myheader)
    i = i+1
    return df 


def f_put(q):
    print "f_put start"        

    while(1): 
        data = getDf()                
        q.put(data)
        print "P:", data["name"].iloc[0]         
        sys.stdout.flush()                    
        time.sleep(1.55)


def f_get(q):
    print "f_get start"    

    while(1):     
        data = pd.DataFrame()

        while not q.empty():
            data = q.get()
            print "get"

        if not data.empty:
            print "G:", data["name"].iloc[0] 
        else:
            print "nothing new"                       
        time.sleep(5.9)


if __name__ == "__main__":

    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=f_put, args=(q,))            
    p.start()
    while(1):
        f_get(q)

    p.join()

当处理100行数据帧时,get-process会获取所有对象的输出。

f_get start
nothing new
f_put start
P: 0        # put 1.object into the queue
P: 1        # put 2.object into the queue
P: 2        # put 3.object into the queue
P: 3        # put 4.object into the queue
get         # get-process takes all 4 objects from the queue
get
get
get
G: 3
P: 4
P: 5
P: 6
get
get
get
G: 6
P: 7
P: 8

对于1000行数据框,get-process仅需要一个对象。

f_get start
nothing new
f_put start
P: 0        # put 1.object into the queue
P: 1        # put 2.object into the queue
P: 2        # put 3.object into the queue
P: 3        # put 4.object into the queue
get     <-- #!!! get-process takes ONLY 1 object from the queue!!!
G: 1
P: 4
P: 5
P: 6
get
G: 2
P: 7
P: 8
P: 9
P: 10
get
G: 3
P: 11

任何想法我做错了什么以及如何通过更大的数据框呢?

我快速测试了你的代码,并且证实它按照你所描述的方式正常工作,即使N > 1000。你是否使用一些旧版本的 pandas 和/或 multiprocessing 库导致了这种行为?(__version__: pandas 0.16.2, multiprocessing 0.70a1, python 2.7.10) - chris-sc
我现在已经更新了所有的软件包,但仍然没有得到预期的结果。请尝试访问http://pastebin.com/bihSv93F。第一次尝试是手动完成的,它可以正常工作,最后一个项目读取为G:2。然后我尝试使用多进程进行相同的操作,但它无法正常工作。 - Meloun
pandas: 0.16.2,multiprocessing: 0.70a1,python 2.7.10 - Meloun
当我使用大型字典而非数据框时,我得到相同的行为。 - Meloun
是的。请看下面我的回答。问题不仅限于 DataFrame,而是所有超过某个大小阈值的 Python 对象,这将取决于系统。 - chris-sc
1个回答

5

冒着不能完全提供完整正常示例的风险,以下是出现问题的原因。

首先,这是一个时间问题。

我使用更大的数据框(10000或者100000)再次尝试了你的代码,并且我开始看到与你一样的情况。这意味着只要数组的大小超过某个阈值,你就会看到这种行为,它将依赖于系统(CPU?)。

我对你的代码进行了一些修改,以便更容易看到发生了什么。首先,在不使用自定义time.sleep的情况下,将5个DataFrames放入队列中。在f_get函数中,我添加了一个计数器(以及一个time.sleep(0),请参见下面的代码)到循环中(while not q.empty())。

新代码:

import multiprocessing, time, sys                                                 
import pandas as pd                                                              

NR_ROWS = 10000                                                                  
i = 0                                                                            
def getDf():                                                                     
    global i, NR_ROWS                                                            
    myheader = ["name", "test2", "test3"]                                        
    myrow1 =   [ i,  i+400, i+250]                                               
    df = pd.DataFrame([myrow1]*NR_ROWS, columns = myheader)                      
    i = i+1                                                                      
    return df                                                                    

def f_put(q):                                                                    
    print "f_put start"                                                          
    j = 0                                                                        
    while(j < 5):                                                                
        data = getDf()                                                           
        q.put(data)                                                              
        print "P:", data["name"].iloc[0]                                         
        sys.stdout.flush()                                                       
        j += 1                                                                   

def f_get(q):                                                                    
    print "f_get start"                                                          
    while(1):
        data = pd.DataFrame()                                                    
        loop = 0                                                                 
        while not q.empty():                                                     
            data = q.get()                                                  
            print "get (loop: %s)" %loop
            time.sleep(0)                                         
            loop += 1                                                            
        time.sleep(1.)                                                           

if __name__ == "__main__":                                                       

    q = multiprocessing.Queue()                                                  
    p = multiprocessing.Process(target=f_put, args=(q,))                         
    p.start()                                                                    
    while(1):                                                                    
        f_get(q)                                                                 
    p.join()

现在,如果你对不同行数运行这个程序,你会看到类似于这样的结果:

N=100:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 2)
get (loop: 3)
get (loop: 4)

N=10000:

f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 0)
get (loop: 0)
get (loop: 0)

这告诉我们什么? 只要 DataFrame 很小,你的假设认为 put 操作比 get 操作更快是正确的,在 while not q.empty() 循环中我们可以获取所有 5 个项目。

但是,随着行数的增加,情况发生了变化。循环条件 q.empty() 计算为 True(队列为空)并且外部的 while(1) 周期执行。

这可能意味着 put 现在比 get 慢,我们需要等待。但是如果我们将整个 f_get 的休眠时间设置为像 15 这样的值,我们仍然会得到相同的行为。

另一方面,如果我们将内部 q.get() 循环中的 time.sleep(0) 改为 1,

while not q.empty():                                                  
    data = q.get()                                                    
    time.sleep(1)                                                     
    print "get (loop: %s)" %loop                                      
    loop += 1

我们收到了这个:
f_get start
f_put start
P: 0
P: 1
P: 2
P: 3
P: 4
get (loop: 0)
get (loop: 1)
get (loop: 2)
get (loop: 3)
get (loop: 4)

看起来没问题!这意味着实际上get做了一些奇怪的事情。似乎在它仍在处理get时,队列状态是empty,并且在完成get之后,下一个项目就可用了。

我相信有理由这样做,但我对multiprocessing不够熟悉,无法看到这一点。

根据您的应用程序,您可以在内部循环中添加适当的time.sleep,看看是否足够。

或者,如果您想解决它(而不是使用time.sleep方法的解决方法),您可以查看multiprocessing并寻找关于阻塞、非阻塞或异步通信的信息-我认为解决方案将在那里找到。


1
下午好,各位!你们解决问题了吗?我也是这个领域的新手。但如果我们在一个工作进程与队列交互期间使用阻塞,这会解决问题吗? - Blademoon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接