使用多进程在 Pandas 中过滤大型数据框。

4
我有一个数据框,需要根据以下条件进行筛选:
CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ACTION' & count_GENRE >= 1
CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ROMANCE' & count_GENRE >= 1
CITY == 'Mumbai' & LANGUAGE == 'Hindi' & count_LANGUAGE >= 1 & GENRE == 'ACTION' 

当我试图通过

这样做时

  df1 = df.query(condition1)
  df2 = df.query(condition2)

由于我的数据框架非常大,我遇到了内存错误。

因此,我计划按照主条件和子条件进行筛选,这样可以减少负载并提高性能。

通过解析上述条件,我设法得到:

main_filter = "CITY == 'Mumbai'"
sub_cond1 = "LANGUAGE == 'English'"
sub_cond1_cond1 = "GENRE == 'ACTION' & count_GENRE >= 1"
sub_cond1_cond2 = "GENRE == 'ROMANCE' & count_GENRE >= 1"
sub_cond2 = "LANGUAGE == 'Hindi' & count_LANGUGE >= 1"
sub_cond2_cond1 = "GENRE == 'COMEDY'"

把它想象成一棵树形结构(当然不是二叉树,实际上它根本不是一棵树)。

现在我想采用多进程方法(深层——子进程嵌套在子进程下)。

现在我想要类似于这样的东西

on level 1
 df = df_main.query(main_filter)
on level 2
 df1 = df.query(sub_cond1)
 df2 = df.query(sub_cond2)
onlevel 3
  df11 = df1.query(sub_cond1_cond1)
  df12 = df1.query(sub_cond1_cond2)
  df21 = df2.query(sub_cond2_cond1)  ######like this

问题在于如何正确地将条件传递给每个级别(如果我要将所有条件存储在列表中(实际上甚至没有考虑过这个问题))。

NB:每个过滤器的结果应导出到单独的CSV文件中。

例如:

df11.to_csv('CITY == 'Mumbai' & LANGUAGE == 'English' & GENRE == 'ACTION' & count_GENRE >= 1')

作为新手,我不知道如何使用多进程(特别是语法和执行方式)。但是不幸的是要完成这个任务。因此无法发布任何代码。
所以有人能给一个实现这个任务的代码示例吗?
如果你有更好的想法(例如类对象或节点遍历),请提出建议。

“huge”是什么意思?你有一个数字吗?例如,数据框中的内存大小或行*列数? - Steve Misuta
@SteveMisuta-实际上,它包含大约1.6亿个记录,并且我必须同时运行2-3个其他.py文件,这些文件在内部并行访问该数据帧。这就是我遇到内存错误(超出RAM容量)的地方。而我对此无能为力(分别运行所有这些py文件)。 - Satya
@SteveMisuta-如果您有任何想法,例如列表迭代方式,然后检查第一个字符(如果以sub_或sub_cond1开头之类的)。那么我也可以接受,请建议。 - Satya
如果您正在从文件中将数据读入内存,那么可以考虑使用分块技术来读取较小的部分并按顺序处理/输出每个部分。Pandas read_csv方法具有chunks和iterator关键字参数,用于按顺序读取主数据集的子集。由于您没有进行任何分组而仅进行过滤,因此可以按顺序处理整个数据集。例如:将数据集分成N个块,逐个读入每个块,进行过滤,写入csv文件,删除内存中的数据帧,读取下一个块,过滤,将输出附加到csv文件中等。 - Steve Misuta
请在此处查看文档:http://pandas.pydata.org/pandas-docs/stable/io.html#io-chunking - Steve Misuta
@SteveMisuta-我已经使用过那种技术(对我的情况没有帮助),但问题是我的处理基于日期时间(一列)字段,而依赖关系是我需要整个数据作为一个单位进行处理。顺便说一下,感谢您的建议。 - Satya
1个回答

21

看起来这似乎是适合使用 dask 的问题,它是一个帮助你处理大于内存数据的 Python 模块。

我将展示如何使用 dask.dataframe 解决这个问题。让我们从创建一些数据开始:

import pandas as pd
from collections import namedtuple
Record = namedtuple('Record', "CITY LANGUAGE GENRE count_GENRE count_LANGUAGE")

cities = ['Mumbai', 'Chennai', 'Bengalaru', 'Kolkata']
languages = ['English', 'Hindi', 'Spanish', 'French']
genres = ['Action', 'Romance', 'Comedy', 'Drama']

import random

df = pd.DataFrame([Record(random.choice(cities), 
                          random.choice(languages), 
                          random.choice(genres), 
                          random.choice([1,2,3]), 
                          random.choice([1,2,3])) for i in range(4000000)])

df.to_csv('temp.csv', index=False)    
print(df.head())

        CITY LANGUAGE    GENRE  count_GENRE  count_LANGUAGE
0    Chennai  Spanish   Action            2               1
1  Bengalaru  English    Drama            2               3
2    Kolkata  Spanish   Action            2               1
3     Mumbai   French  Romance            1               2
4    Chennai   French   Action            2               3

以上创建的数据共有400万行,占用107 MB。虽然不是大于内存的数据,但在这个例子中足够使用。

下面展示一个python会话的记录,其中根据问题的标准筛选了数据:

>>> import dask.dataframe as dd
>>> dask_df = dd.read_csv('temp.csv', header=0)
>>> dask_df.npartitions
4

# We see above that dask.dataframe has decided to split the 
# data into 4 partitions

# We now execute the query:
>>> result = dask_df[(dask_df['CITY'] == 'Mumbai') &
...                  (dask_df['LANGUAGE'] == 'English') &
...                  (dask_df['GENRE'] == 'Action') &
...                  (dask_df['count_GENRE'] > 1)]
>>>

# The line above takes very little time to execute.  In fact, nothing has
# really been computed yet.  Behind the scenes dask has create a plan to  
# execute the query, but has not yet pulled the trigger.

# The result object is a dask dataframe:
>>> type(result)
<class 'dask.dataframe.core.DataFrame'>
>>> result
dd.DataFrame<series-slice-read-csv-temp.csv-fc62a8c019c213f4cd106801b9e45b29[elemwise-cea80b0dd8dd29ae325a9db1896b027c], divisions=(None, None, None, None, None)>

# We now pull the trigger by calling the compute() method on the dask
# dataframe.  The execution of the line below takes a few seconds:
>>> dfout = result.compute()

# The result is a regular pandas dataframe:
>>> type(dfout)
<class 'pandas.core.frame.DataFrame'>

# Of our 4 million records, only ~40k match the query:
>>> len(dfout)
41842

>>> dfout.head()
       CITY LANGUAGE   GENRE  count_GENRE  count_LANGUAGE
225  Mumbai  English  Action            2               3
237  Mumbai  English  Action            3               2
306  Mumbai  English  Action            3               3
335  Mumbai  English  Action            2               2
482  Mumbai  English  Action            2               3

我希望这可以让你开始解决你的问题。如果需要更多关于dask的信息,请查看教程示例


@Pedro-感谢您提供新的方法。我还没有测试过您的示例。但是,从400万条记录中筛选出需要多长时间(大约)? - Satya
1
@Satya,dask实际上使用了multiprocessing,因此如果您有多个核心,您将看到一些加速。 - Pedro M Duarte
@Satya,你面临的情况是各个查询具有一些共同的条件。如果共同条件的子集足以将结果大小缩小到合理范围内,则最好的方法是:1.使用dask按照最小的共同条件子集过滤数据,以将数据大小减少到RAM的合理分数。2.计算dask结果以获取pandas数据帧。3.在生成的pandas数据帧上查询每个查询的剩余条件。 - Pedro M Duarte
@Pedro,还有一件事,我能在Dask DataFrame上应用chunksize过滤器(像在Pandas DataFrame的情况下取一个数据块,然后执行一些操作,再合并它们--发送回去)吗? - Satya
1
这篇文章让我想尝试一下Dask。很棒!谢谢! - GrimSqueaker
显示剩余8条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接