从一个大的CSV文件中随机读取一个小样本,并将其加载到Pandas数据框中。

108
我想读取的CSV文件太大,无法放入主内存中。我该如何随机读取其中几千行数据,并对所选数据框进行简单统计分析呢?

1
你可以使用 nrowsskiprows 参数来读取特定数量的行并跳过它们,但我不知道如何使用 read_csv 读取随机数量的行。 - EdChum
参见相关链接:https://dev59.com/vGgv5IYBdhLWcg3wJNZF?rq=1,尽管这里的问题是将数据框追加了10,000次。即使您为临时存储构建了列表或字典,这也会很慢且浪费资源,我认为。 - EdChum
1
这是如何使用HDF5文件完成的;将您的csv文件转换为HDF5,然后使用此方法:https://dev59.com/R3vZa4cB1Zd3GeqP_iRX - Jeff
对于每个第k行,使用skiprows=lambda i: i % k - william_grisaitis
13个回答

107
假设CSV文件中没有表头:
import pandas
import random

n = 1000000 #number of records in file
s = 10000 #desired sample size
filename = "data.txt"
skip = sorted(random.sample(range(n),n-s))
df = pandas.read_csv(filename, skiprows=skip)

如果read_csv有一个keeprows的话,或者skiprows接受一个回调函数而不是一个列表,那就更好了。

带有标题和未知文件长度:

import pandas
import random

filename = "data.txt"
n = sum(1 for line in open(filename)) - 1 #number of records in file (excludes header)
s = 10000 #desired sample size
skip = sorted(random.sample(range(1,n+1),n-s)) #the 0-indexed header will not be included in the skip list
df = pandas.read_csv(filename, skiprows=skip)

4
请确保您先阅读标题,或者干脆把它砍掉,如果您不这样做,会发生不好的事情。 - ekta
1
编辑以包括文件具有标题的情况。 - dlm
1
现在 skiprows 接受一个可调用对象,请参见下面的答案:https://dev59.com/w2Eh5IYBdhLWcg3wk0Oz#48589768 - exp1orer
6
使用range()函数来操作Python 3.x。 - GileBrt

74

@dlm的答案很棒,但自从v0.20.0以后,skiprows接受可调用对象。可调用对象接收行号作为参数。

还要注意一点,他们对于未知文件长度的解决方案依赖于两次迭代文件--一次获取长度,另一次读取csv。我这里有三个解决方案,它们只依赖于一次迭代文件,但它们都有权衡。

解决方案1:大致百分比

如果您可以指定您想要的行数是总行数的百分之几,而不是多少行,那么您甚至不需要获取文件大小,只需要读取一次文件即可。 假设第一行是标题:

import pandas as pd
import random
p = 0.01  # 1% of the lines
# keep the header, then take only 1% of lines
# if random from [0,1] interval is greater than 0.01 the row will be skipped
df = pd.read_csv(
         filename,
         header=0, 
         skiprows=lambda i: i>0 and random.random() > p
)

正如评论中指出的那样,这只能给出大约正确的行数,但我认为它满足了所需的用例。

解决方案2:每N行

这实际上不是一个随机样本,但根据您的输入如何排序以及您想要实现什么目标,这可能符合您的需求。

n = 100  # every 100th line = 1% of the lines
df = pd.read_csv(filename, header=0, skiprows=lambda i: i % n != 0)

解决方案3:水塘抽样

(新增于2021年7月)

水塘抽样是一种优雅的算法,用于从流式数据中随机选择k个项目,其长度未知,但只能观察一次。

最大的优点是您可以在没有完整数据集的情况下使用它,并且它可以给您一个确切大小的样本,而不知道完整数据集的大小。缺点是我没有看到纯粹使用pandas实现它的方法,我认为您需要跳到python来读取文件,然后构建数据帧。因此,您可能会失去一些来自read_csv的功能,或者需要重新实现它,因为我们没有使用pandas来实际读取文件。

参考Oscar Benjamin在这里的算法实现:

from math import exp, log, floor
from random import random, randrange
from itertools import islice
from io import StringIO

def reservoir_sample(iterable, k=1):
    """Select k items uniformly from iterable.

    Returns the whole population if there are k or fewer items

    from https://bugs.python.org/issue41311#msg373733
    """
    iterator = iter(iterable)
    values = list(islice(iterator, k))

    W = exp(log(random())/k)
    while True:
        # skip is geometrically distributed
        skip = floor( log(random())/log(1-W) )
        selection = list(islice(iterator, skip, skip+1))
        if selection:
            values[randrange(k)] = selection[0]
            W *= exp(log(random())/k)
        else:
            return values

def sample_file(filepath, k):
    with open(filepath, 'r') as f:
        header = next(f)
        result = [header] + sample_iter(f, k)
    df = pd.read_csv(StringIO(''.join(result)))

reservoir_sample函数返回一个字符串列表,每个字符串是一行数据,因此我们只需在最后将其转换为数据框。这假设有且仅有一个标题行,我还没有考虑如何扩展到其他情况。

我在本地测试过,它比其他两个解决方案快得多。使用一个550 MB的csv文件(来自纽约市交通局出租车旅行记录2020年1月),解决方案3执行时间大约为1秒,而其他两个需要大约3-4秒。

在我的测试中,这甚至比@Bar使用shuf答案稍微快了一点(约10-20%),这让我很惊讶。


6
此解决方案不能确保准确的 X% 行数。如果 random.random() 总是返回大于 0.01 的数字怎么办? - codefreak
1
@codefreak,你说得对,它并不能保证完全达到X%。 - exp1orer
1
这里有另一个Python中的reservoir_sample(it, k)实现(也许不太高效,但更易于理解)。 - jfs
@exp1orer,这个答案里应该包括 sample_iter 吗? - user3494047

37

这不是Pandas中的内容,但是它可以通过bash更快地实现相同的结果,同时不会将整个文件读入内存

shuf -n 100000 data/original.tsv > data/sample.tsv
shuf 命令将对输入进行随机排列,-n 参数用于指定输出中需要多少行。
参考问题:https://unix.stackexchange.com/q/108581 使用 7M 行 csv 进行基准测试,数据可在此处下载(2008):
最佳答案:
def pd_read():
    filename = "2008.csv"
    n = sum(1 for line in open(filename)) - 1 #number of records in file (excludes header)
    s = 100000 #desired sample size
    skip = sorted(random.sample(range(1,n+1),n-s)) #the 0-indexed header will not be included in the skip list
    df = pandas.read_csv(filename, skiprows=skip)
    df.to_csv("temp.csv")

pandas 的时间

%time pd_read()
CPU times: user 18.4 s, sys: 448 ms, total: 18.9 s
Wall time: 18.9 s

使用 shuf 命令时:

time shuf -n 100000 2008.csv > temp.csv

real    0m1.583s
user    0m1.445s
sys     0m0.136s

因此,shuf的速度大约快了12倍,而且最重要的是不会读取整个文件到内存中


6
我建议删去“header”行(例如可以用“tail”命令)。 - Brandt
1
tail -n +2 <file> | shuf -n <nrows> -o <newfile> && sed -i '1i<header>' <newfile> 看起来并不是很精致,但对我来说很有效。 - nikkou
1
对于Mac用户,如果找不到shuf命令,请先使用brew install coreutils进行安装,然后使用等效的gshuf命令。这种解决方案比调用random函数要快得多。 - Shan Dou
对于Windows用户,您可以使用Git Bash(包含在Git for Windows中)。 - Tiago Martins Peres

11

以下是一种算法,不需要事先计算文件中行数,因此您只需要读取文件一次。

假设您需要m个样本。首先,该算法保留前m个样本。当它看到第i个样本(i > m)时,以概率m/i使用该样本来随机替换已选择的样本。

通过这种方式,对于任何i > m,我们始终有一个包含在前i个样本中随机选择的m个样本的子集。

请看下面的代码:

import random

n_samples = 10
samples = []

for i, line in enumerate(f):
    if i < n_samples:
        samples.append(line)
    elif random.random() < n_samples * 1. / (i+1):
            samples[random.randint(0, n_samples-1)] = line

1
但是枚举不需要将整个文件加载到内存中吗? - Randnum

4
以下代码首先读取标题,然后随机抽样其他行:
import pandas as pd
import numpy as np

filename = 'hugedatafile.csv'
nlinesfile = 10000000
nlinesrandomsample = 10000
lines2skip = np.random.choice(np.arange(1,nlinesfile+1), (nlinesfile-nlinesrandomsample), replace=False)
df = pd.read_csv(filename, skiprows=lines2skip)

这个解决方案没有考虑CSV文件中现有行数的数量,这里是静态的。 - Guru Bhandari

3
class magic_checker:
    def __init__(self,target_count):
        self.target = target_count
        self.count = 0
    def __eq__(self,x):
        self.count += 1
        return self.count >= self.target

min_target=100000
max_target = min_target*2
nlines = randint(100,1000)
seek_target = randint(min_target,max_target)
with open("big.csv") as f:
     f.seek(seek_target)
     f.readline() #discard this line
     rand_lines = list(iter(lambda:f.readline(),magic_checker(nlines)))

#do something to process the lines you got returned .. perhaps just a split
print rand_lines
print rand_lines[0].split(",")

我认为类似这样的东西应该可以工作


3

No pandas!

import random
from os import fstat
from sys import exit

f = open('/usr/share/dict/words')

# Number of lines to be read
lines_to_read = 100

# Minimum and maximum bytes that will be randomly skipped
min_bytes_to_skip = 10000
max_bytes_to_skip = 1000000

def is_EOF():
    return f.tell() >= fstat(f.fileno()).st_size

# To accumulate the read lines
sampled_lines = []

for n in xrange(lines_to_read):
    bytes_to_skip = random.randint(min_bytes_to_skip, max_bytes_to_skip)
    f.seek(bytes_to_skip, 1)
    # After skipping "bytes_to_skip" bytes, we can stop in the middle of a line
    # Skip current entire line
    f.readline()
    if not is_EOF():
        sampled_lines.append(f.readline())
    else:
        # Go to the begginig of the file ...
        f.seek(0, 0)
        # ... and skip lines again
        f.seek(bytes_to_skip, 1)
        # If it has reached the EOF again
        if is_EOF():
            print "You have skipped more lines than your file has"
            print "Reduce the values of:"
            print "   min_bytes_to_skip"
            print "   max_bytes_to_skip"
            exit(1)
        else:
            f.readline()
            sampled_lines.append(f.readline())

print sampled_lines

你最终会得到一个sampled_lines列表。你说的统计数据是指什么?

很棒的是,不需要安装模块就可以获得代码...我添加了以下内容以获取txt文件输出 ---- 文件名='random_lines.csv' 目标 = 打开(filename, 'w') 然后在 "if not is_EOF" 中我添加了 target.write(f.readline()) target.write("\n") - GeorgeC

2

在将数据带入Python环境之前,您可以使用10000条记录创建一个样本。

在Windows 10上使用Git Bash,我只需运行以下命令即可生成样本:

shuf -n 10000 BIGFILE.csv > SAMPLEFILE.csv

注意:如果您的CSV文件有标题行,这不是最佳解决方案。


2

使用 子样本

来处理大型CSV文件。

pip install subsample
subsample -n 1000 file.csv > file_1000_sample.csv

创建了一个空文件,但没有填充内容。使用的是Windows10操作系统和Anaconda4环境。 - bmc
对我没用。Shell冻结或者进程非常长。 - Conner M.
当使用pd.read_csv加载时,这将无法工作并失败于ParseError异常。 - Dwipam Katariya

2

TL;DR

如果您知道要采样的大小,但不知道输入文件的大小,则可以使用以下 pandas 代码有效地加载随机采样:

import pandas as pd
import numpy as np

filename = "data.csv"
sample_size = 10000
batch_size = 200

rng = np.random.default_rng()

sample_reader = pd.read_csv(filename, dtype=str, chunksize=batch_size)

sample = sample_reader.get_chunk(sample_size)

for chunk in sample_reader:
    chunk.index = rng.integers(sample_size, size=len(chunk))
    sample.loc[chunk.index] = chunk

解释

并不总是容易知道输入CSV文件的大小。

如果有嵌入的换行符, 像wcshuf这样的工具会给出错误的答案或者只会把你的数据搞乱。

所以,基于desktable答案,我们可以将文件的前sample_size行作为初始样本,然后对于文件中的每一行,随机替换初始样本中的一行。

为了有效地执行此操作, 我们使用一个TextFileReader来加载CSV文件,通过传递chunksize=参数:

sample_reader = pd.read_csv(filename, dtype=str, chunksize=batch_size)

首先,我们得到最初的样本:
sample = sample_reader.get_chunk(sample_size)

然后,我们迭代文件的剩余块,将每个块的索引替换为一系列随机整数,长度与块的大小相同,但其中每个整数都在初始样本的index范围内(这恰好与range(sample_size)相同):
for chunk in sample_reader:
    chunk.index = rng.integers(sample_size, size=len(chunk))

使用这个重新索引的块替换样本中的(一些)行:
sample.loc[chunk.index] = chunk

for循环之后,您将拥有一个数据框,其长度最多为sample_size行,但是这些行是从大型CSV文件中随机选择的。
为了使循环更有效率,您可以使batch_size尽可能大,只要内存允许(如果可以,甚至比sample_size还要大)。
请注意,在使用np.random.default_rng().integers()创建新块索引时,我们使用len(chunk)作为新块索引大小,而不是简单地使用batch_size,因为循环中的最后一个块可能较小。
另一方面,我们使用sample_size而不是len(sample)作为随机整数的“范围”,即使文件中的行数少于sample_size也是如此。这是因为在这种情况下不会剩下任何块需要循环,所以这永远不会成为问题。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接