用Python顺序读取大型CSV文件

17

我有一个10GB的CSV文件,其中包含一些需要使用的信息。

由于我的电脑内存有限,我无法一次性将整个文件读入内存。因此,我希望可以逐行迭代地读取部分数据。

比如说,在第一次迭代中,我想读取前100行,在第二次迭代中,我想读取101到200行,以此类推。

在Python中,有没有一种高效的方法来实现这个任务呢?Pandas是否提供了一些有用的工具?或者还有其他更好的方法(从内存和速度方面考虑)?


也许可以:https://dev59.com/NWgv5IYBdhLWcg3wSe0f - languitar
虽然在Python中有许多不同的方法可以做到这一点,但有时候将文件(例如使用split -l 100 filename)拆分为更小的文件,在使用Python进行处理之前进行操作会更加实用。 - sphere
4个回答

16

这是简短的答案。

chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
    process(chunk)

这是非常冗长的答案。

要开始,请导入pandas和sqlalchemy。下面的命令将完成此操作。

import pandas as pd
from sqlalchemy import create_engine

接下来,设置一个指向你的csv文件的变量。虽然不是必需的,但它有助于实现可重复使用性。

file = '/path/to/csv/file'

有了这三行代码,我们已经准备好开始分析数据了。让我们来看一下csv文件的“head”以查看内容可能是什么样子。

print pd.read_csv(file, nrows=5)

这个命令使用pandas的“read_csv”命令读取了5行数据(nrows=5),然后将这些行打印到屏幕上。这让您了解csv文件的结构,并确保数据以适合您工作的方式格式化。

在我们实际处理数据之前,我们需要对其进行某些操作,以便开始筛选数据子集。通常,我会使用pandas的dataframe,但对于大型数据文件,我们需要将数据存储在其他地方。在这种情况下,我们将设置一个本地sqllite数据库,按块读取csv文件,然后将这些块写入sqllite。

要做到这一点,我们首先需要使用以下命令创建sqllite数据库。

csv_database = create_engine('sqlite:///csv_database.db')

接下来,我们需要分块迭代CSV文件,并将数据存储到sqllite中。

chunksize = 100000
i = 0
j = 1
for df in pd.read_csv(file, chunksize=chunksize, iterator=True):
      df = df.rename(columns={c: c.replace(' ', '') for c in df.columns}) 
      df.index += j
      i+=1
      df.to_sql('table', csv_database, if_exists='append')
      j = df.index[-1] + 1

使用此代码,我们将chunksize设置为100,000以保持块的大小可管理,初始化一些迭代器(i = 0,j = 0),然后运行for循环。for循环从CSV文件中读取一块数据,从任何列名中删除空格,然后将该块存储到sqllite数据库中(df.to_sql(…))。

如果您的CSV文件足够大,这可能需要一些时间,但等待的时间是值得的,因为现在您可以使用pandas的“sql”工具从数据库中提取数据而不必担心内存限制。

现在,您可以运行以下命令之类的命令来访问数据:

df = pd.read_sql_query('SELECT * FROM table', csv_database)

当然,使用“select * …”会将所有数据加载到内存中,这正是我们试图避免的问题,因此您应该将筛选器放入选择语句中以过滤数据。例如:

df = pd.read_sql_query('SELECT COl1, COL2 FROM table where COL1 = SOMEVALUE', csv_database)

5

1
在读取每个chunk_df后,我们可以将它们连接起来,以获得一个包含完整CSV文件的数据框。 df = pd.concat(chunk_df, ignore_index=True) - Debashis Sahoo

2
这段代码可能对你的这个任务有所帮助。它可以在大型 .csv 文件中浏览,不会消耗大量内存,因此你可以在标准笔记本电脑上执行此操作。
import pandas as pd
import os

这里的块大小指定了您想要稍后读取的csv文件中的行数

chunksize2 = 2000

path = './'
data2 = pd.read_csv('ukb35190.csv',
                chunksize=chunksize2,
                encoding = "ISO-8859-1")
df2 = data2.get_chunk(chunksize2)
headers = list(df2.keys())
del data2

start_chunk = 0
data2 = pd.read_csv('ukb35190.csv',
                chunksize=chunksize2,
                encoding = "ISO-8859-1",
                skiprows=chunksize2*start_chunk)

headers = []

for i, df2 in enumerate(data2):
try:

    print('reading cvs....')
    print(df2)
    print('header: ', list(df2.keys()))
    print('our header: ', headers)

    # Access chunks within data

    # for chunk in data:

    # You can now export all outcomes in new csv files
    file_name = 'export_csv_' + str(start_chunk+i) + '.csv'
    save_path = os.path.abspath(
        os.path.join(
            path, file_name
        )
    )
    print('saving ...')

except Exception:
    print('reach the end')
    break

0

将大型CSV文件转移到数据库的方法非常好,因为我们可以轻松使用SQL查询。

我们还需要考虑两件事情。

第一点: SQL也不是橡胶,它无法拉伸内存。

例如,转换为bd file

https://nycopendata.socrata.com/Social-Services/311-Service-Requests- from-2010-to-Present/erm2-nwe9

针对此数据库文件的 SQL 语言:

pd.read_sql_query("SELECT * FROM 'table'LIMIT 600000", Mydatabase)

它最多可以读取大约60万条记录,但不超过16GB RAM的PC内存(操作时间为15.8秒)。 直接从csv文件下载可能会有恶意。
giga_plik = 'c:/1/311_Service_Requests_from_2010_to_Present.csv'
Abdul = pd.read_csv(giga_plik, nrows=1100000)

(操作时间16.5秒)

第二点:为了有效地使用从CSV转换的SQL数据系列,我们应该记住适当的日期格式。因此,我建议在ryguy72的代码中添加以下内容:

df['ColumnWithQuasiDate'] = pd.to_datetime(df['Date'])

关于我指出的文件311的所有代码:

start_time = time.time()
### sqlalchemy create_engine
plikcsv = 'c:/1/311_Service_Requests_from_2010_to_Present.csv'
WM_csv_datab7 = create_engine('sqlite:///C:/1/WM_csv_db77.db')
#----------------------------------------------------------------------
chunksize = 100000 
i = 0
j = 1
## --------------------------------------------------------------------
for df in pd.read_csv(plikcsv, chunksize=chunksize, iterator=True, encoding='utf-8', low_memory=False):
      df = df.rename(columns={c: c.replace(' ', '') for c in df.columns}) 
## -----------------------------------------------------------------------
      df['CreatedDate'] = pd.to_datetime(df['CreatedDate'])  # to datetimes
      df['ClosedDate'] = pd.to_datetime(df['ClosedDate'])
## --------------------------------------------------------------------------
      df.index += j
      i+=1
      df.to_sql('table', WM_csv_datab7, if_exists='append')
      j = df.index[-1] + 1
print(time.time() - start_time)

最后我想补充一点:直接从互联网将csv文件转换为数据库似乎不是一个好主意。我建议先下载基础文件,然后在本地进行转换。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接