从S3流式传输/分块csv文件到Python

13

我打算在S3中存储的非常大的csv文件上执行一些占用内存的操作,使用Python,并打算将脚本移动到AWS Lambda。我知道我可以将整个csv文件读入内存,但是对于这样一个大文件,我肯定会遇到Lambda的内存和存储限制,是否有办法使用boto3 / botocore以流式传输的方式或一次只读取csv的一部分到Python中,最好是通过指定要读取的行号来实现?

以下是我已经尝试过的一些方法:

1)使用S3.get_object中的range参数指定要读取的字节范围。不幸的是,这意味着最后几行会被截断,因为没有办法指定要读取的行数。存在一些混乱的解决方案,例如扫描最后一个换行符,记录索引,然后将其作为下一个字节范围的起点,但如果可能的话,我想避免这种笨拙的解决方案。

2)使用S3 select编写SQL查询,从S3存储桶中选择性地检索数据。不幸的是,row_numbers SQL函数不受支持,而且似乎没有一种方法可以读取一部分行。

3个回答

11

假设您的文件没有被压缩,这将涉及从流中读取并在换行符上分割。读取一块数据,找到该块中最后一个换行符的实例,分割并处理。

s3 = boto3.client('s3')
body = s3.get_object(Bucket=bucket, Key=key)['Body']

# number of bytes to read per chunk
chunk_size = 1000000

# the character that we'll split the data with (bytes, not string)
newline = '\n'.encode()   
partial_chunk = b''

while (True):
    chunk = partial_chunk + body.read(chunk_size)

    # If nothing was read there is nothing to process
    if chunk == b'':
        break

    last_newline = chunk.rfind(newline)

    # write to a smaller file, or work against some piece of data
    result = chunk[0:last_newline+1].decode('utf-8')

    # keep the partial line you've read here
    partial_chunk = chunk[last_newline+1:]

如果您有gzip文件,则需要在循环内使用BytesIOGzipFile类;这是一个更难的问题,因为您需要保留Gzip压缩细节。


1
看起来 body = s3.get_object(Bucket=bucket, Key=key).read() 需要替换为 body = s3.get_object(Bucket=bucket, Key=key)['Body'] - user 923227
1
一个需要强调的问题:当块是 b'' 时,即我们需要中断时,用户需要处理 partial_chunk,否则他们会错过文件的结尾... - Andrew Alcock
1
另一个问题是,如果没有换行符,代码最终会产生一个单个字符结果块,并且处理过的字符也会留在“partial_chunk”中,在下一次循环中进行处理。 - Andrew Alcock

2

这在Excel中不起作用。 - TNN

2

我开发了一个类似于@Kirk Broadhurst的代码,但是如果每个块的处理时间超过5分钟(大约),连接超时会发生。以下代码通过为每个块打开一个新连接来解决这个问题。

import boto3
import pandas as pd
import numpy as np

# The following credentials should not be hard coded, it's best to get these from cli.
region_name = 'region'
aws_access_key_id = 'aws_access_key_id'
aws_secret_access_key = 'aws_secret_access_key'

s3 =boto3.client('s3',region_name=region_name,aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

obj = s3.get_object(Bucket='bucket', Key='key')

total_bytes = obj['ContentLength']
chunk_bytes = 1024*1024*5 # 5 MB as an example.
floor = int(total_bytes//chunk_bytes)
whole = total_bytes/chunk_bytes
total_chunks = [1+floor if floor<whole else floor][0]

chunk_size_list = [(i*chunk_bytes, (i+1)*chunk_bytes-1) for i in range(total_chunks)]
a,b = chunk_size_list[-1]
b = total_bytes
chunk_size_list[-1] = (a,b)
chunk_size_list = [f'bytes={a}-{b}' for a,b in chunk_size_list]

prev_str = ''

for i,chunk in enumerate(chunk_size_list):
    s3 = boto3.client('s3', region_name=region_name, aws_access_key_id=aws_access_key_id, 
                      aws_secret_access_key=aws_secret_access_key)
    byte_obj = s3.get_object(Bucket='bucket', Key='key', Range=chunk_size_list[i])
    byte_obj = byte_obj['Body'].read()
    str_obj = byte_obj.decode('utf-8')
    del byte_obj
    list_obj = str_obj.split('\n')
    # You can use another delimiter instead of ',' below.
    if len(prev_str.split(',')) < len(list_obj[1].split(',')) or len(list_obj[0].split(',')) < len(list_obj[1].split(',')):
        list_obj[0] = prev_str+list_obj[0]
    else:
        list_obj = [prev_str]+list_obj
    prev_str = list_obj[-1]
    del str_obj, list_obj[-1] 
    list_of_elements = [st.split(',') for st in list_obj]
    del list_obj
    df = pd.DataFrame(list_of_elements)
    del list_of_elements
    gc.collect()
    # You can process your pandas dataframe here, but you need to cast it to correct datatypes.
    # casting na values to numpy nan type.
    na_values = ['', '#N/A', '#N/A N/A', '#NA', '-1.#IND', '-1.#QNAN', '-NaN', '-nan', '1.#IND', '1.#QNAN', 'N/A', 'NA', 'NULL', 'NaN', 'n/a', 'nan', 'null']
    df = df.replace(na_values, np.nan)
    dtypes = {col1: 'float32', col2:'category'}
    df = df.astype(dtype=dtypes, copy=False)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接