阅读一个巨大的 .csv 文件

153

我目前正在尝试使用Python 2.7从.csv文件中读取数据,其中包含最多1百万行、200列的数据(文件大小在100MB到1.6GB之间)。对于行数小于300,000的文件,我可以做到这一点(但速度非常慢),但是一旦超过这个范围,就会出现内存错误。我的代码如下:

def getdata(filename, criteria):
    data=[]
    for criterion in criteria:
        data.append(getstuff(filename, criteron))
    return data

def getstuff(filename, criterion):
    import csv
    data=[]
    with open(filename, "rb") as csvfile:
        datareader=csv.reader(csvfile)
        for row in datareader: 
            if row[3]=="column header":
                data.append(row)
            elif len(data)<2 and row[3]!=criterion:
                pass
            elif row[3]==criterion:
                data.append(row)
            else:
                return data

在 getstuff 函数中使用 else 子句的原因是,所有符合条件的元素将在 csv 文件中一起列出,因此当超过这些元素时,我会离开循环以节省时间。

我的问题是:

  1. 如何使其能够处理更大的文件?

  2. 有什么方法可以加快速度吗?

我的计算机搭载8GB RAM,运行64位Windows 7系统,处理器为3.40 GHz(不确定您需要哪些信息)。


2
我知道有几个看起来相似的问题,但是没有一个似乎足够具体以帮助我解决问题。如果我错过了其中一个,请原谅。 - Charles Dillon
2
你应该将读取的数据存储在数据库中(例如Sqlite),而不是保存在内存中。然后,您可以在数据库上运行进一步的处理,如过滤。 - Michael Butscher
8个回答

192

您正在将所有行读入到列表中,然后再处理该列表。 不要这样做

在生成它们时处理您的行。如果您需要先过滤数据,请使用生成器函数:

import csv

def getstuff(filename, criterion):
    with open(filename, "rb") as csvfile:
        datareader = csv.reader(csvfile)
        yield next(datareader)  # yield the header row
        count = 0
        for row in datareader:
            if row[3] == criterion:
                yield row
                count += 1
            elif count:
                # done when having read a consecutive series of rows 
                return

我还简化了你的过滤器测试; 逻辑相同但更为简洁。因为你只匹配符合条件的一系列行,所以也可以使用:
import csv
from itertools import dropwhile, takewhile

def getstuff(filename, criterion):
    with open(filename, "rb") as csvfile:
        datareader = csv.reader(csvfile)
        yield next(datareader)  # yield the header row
        # first row, plus any subsequent rows that match, then stop
        # reading altogether
        # Python 2: use `for row in takewhile(...): yield row` instead
        # instead of `yield from takewhile(...)`.
        yield from takewhile(
            lambda r: r[3] == criterion,
            dropwhile(lambda r: r[3] != criterion, datareader))
        return

您现在可以直接对getstuff()进行循环。在getdata()中执行相同的操作:
def getdata(filename, criteria):
    for criterion in criteria:
        for row in getstuff(filename, criterion):
            yield row

现在可以直接在你的代码中循环使用getdata()

for row in getdata(somefilename, sequence_of_criteria):
    # process row

现在你只需要在内存中保存一行,而不是按照每个标准保存成千上万行。

yield会使函数成为一个生成器函数,这意味着在开始循环之前它不会执行任何操作。


你认为这是什么? - Tariq Ahmed
1
@TariqAhmed:有关系吗?这取决于他们的CSV数据中包含什么。搜索的是第四列中的内容。 - Martijn Pieters

78

虽然Martijin的答案可能是最好的,但这里提供一种更直观的方法来处理大型csv文件,适合初学者。这样可以让您一次处理一组行或块。

import pandas as pd
chunksize = 10 ** 8
for chunk in pd.read_csv(filename, chunksize=chunksize):
    process(chunk)

11
为什么使用Pandas会更加直观? - wwii
48
对于像我这样的新手来说,4行代码总是更好。 - mmann1123
4
Python常规代码同样简洁,可以逐行处理。生成器函数仅用于过滤内容;如何在 Pandas 中进行同样的过滤? - Martijn Pieters
1
即使某些行的内容跨越多行,它仍然可以很好地工作! - Dielson Sales
1
@user5359531 数据准备和分析通常使用pandas数据框架完成。以csv文件的形式加载输入数据非常普遍。另请参阅其他答案中提到的参数'chunksize'和'usecols'。 - Paul Rougieux
显示剩余4条评论

26

我进行了相当数量的振动分析,并查看了大量数据集(数千万和数亿个数据点)。我的测试表明,pandas.read_csv() 函数比 numpy.genfromtxt() 快20倍。而 genfromtxt() 函数比 numpy.loadtxt() 快3倍。对于大型数据集,似乎需要使用 pandas。

我在一篇讨论振动分析中MATLAB与Python速度比较的博客中发布了我用于此测试的代码和数据集。


7
原帖的主要问题不是速度,而是内存耗尽。使用不同的函数来处理文件本身并不能消除将其读入列表而不是使用流处理器所带来的缺点。 - pydsigner

13

对于遇到这个问题的人。使用pandas的‘chunksize’和‘usecols’参数,帮助我比其他提议的选项更快地读取一个大型压缩文件。

import pandas as pd

sample_cols_to_keep =['col_1', 'col_2', 'col_3', 'col_4','col_5']

# First setup dataframe iterator, ‘usecols’ parameter filters the columns, and 'chunksize' sets the number of rows per chunk in the csv. (you can change these parameters as you wish)
df_iter = pd.read_csv('../data/huge_csv_file.csv.gz', compression='gzip', chunksize=20000, usecols=sample_cols_to_keep) 

# this list will store the filtered dataframes for later concatenation 
df_lst = [] 

# Iterate over the file based on the criteria and append to the list
for df_ in df_iter: 
        tmp_df = (df_.rename(columns={col: col.lower() for col in df_.columns}) # filter eg. rows where 'col_1' value grater than one
                                  .pipe(lambda x:  x[x.col_1 > 0] ))
        df_lst += [tmp_df.copy()] 

# And finally combine filtered df_lst into the final lareger output say 'df_final' dataframe 
df_final = pd.concat(df_lst)

11
我的经验告诉我,速度极快的是以下内容:
import pandas as pd
import dask.dataframe as dd
import time
t=time.clock()
df_train = dd.read_csv('../data/train.csv', usecols=[col1, col2])
df_train=df_train.compute()
print("load train: " , time.clock()-t)

另一个可行的解决方案是:
import pandas as pd 
from tqdm import tqdm

PATH = '../data/train.csv'
chunksize = 500000 
traintypes = {
'col1':'category',
'col2':'str'}

cols = list(traintypes.keys())

df_list = [] # list to hold the batch dataframe

for df_chunk in tqdm(pd.read_csv(PATH, usecols=cols, dtype=traintypes, chunksize=chunksize)):
    # Can process each chunk of dataframe here
    # clean_data(), feature_engineer(),fit()

    # Alternatively, append the chunk to list and merge all
    df_list.append(df_chunk) 

# Merge all dataframes into one dataframe
X = pd.concat(df_list)

# Delete the dataframe list to release memory
del df_list
del df_chunk

你第一个解决方案中的 df_train=df_train.compute() 这一行不是把整个数据集加载到内存中了吗...这不是他试图避免的吗? - Sam Dillard
time.clock()在Python 3.3中已经被废弃,并将从Python 3.8中删除:请改用time.perf_counter()time.process_time() - Archon

2

以下是Python3的另一种解决方案:

import csv
with open(filename, "r") as csvfile:
    datareader = csv.reader(csvfile)
    count = 0
    for row in datareader:
        if row[3] in ("column header", criterion):
            doSomething(row)
            count += 1
        elif count > 2:
            break

这里的datareader是一个生成器函数。


抱歉,它并不像使用yield操作符的解决方案一样高效。回调函数调用会增加更多的开销,特别是因为你必须显式地分别处理状态。 - Martijn Pieters
@MartijnPieters 谢谢。已更新答案。 - Rishabh Agrahari

1
如果您正在使用pandas并且有足够的RAM(足以将整个文件读入内存),请尝试使用pd.read_csvlow_memory=False,例如:
import pandas as pd
data = pd.read_csv('file.csv', low_memory=False)

0
所有这些答案都在最后使用了pd.concat(),假设筛选后的数据框适合内存,但如果数据框真的非常大,你可以采用这种方法来读取块并在进行过程中追加到文件中。你还可以将多进程并入这种方法,并在文件写入时加锁。
input_filename= 'input.csv'
out_filename = 'output.csv'

chunks= pd.read_csv(input_filename, chunksize=80000)
for i,chunk in enumerate(chunks):
    chunk = chunk[chunk.Availability != "Out of Stock"]
    chunk.to_csv(out_filename, header=(i==0), mode='a', index=False)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接