你能否使用流而不是本地文件上传到S3?

77

我需要创建一个CSV并将其上传到S3桶。由于我是即时创建文件,因此最好能够在创建文件时直接将其写入S3桶,而不是先将整个文件写入本地,然后在最后上传文件。

有没有方法可以做到这一点?我的项目使用Python语言,我对该语言还比较新。以下是我迄今为止尝试过的内容:

import csv
import csv
import io
import boto
from boto.s3.key import Key


conn = boto.connect_s3()
bucket = conn.get_bucket('dev-vs')
k = Key(bucket)
k.key = 'foo/foobar'

fieldnames = ['first_name', 'last_name']
writer = csv.DictWriter(io.StringIO(), fieldnames=fieldnames)
k.set_contents_from_stream(writer.writeheader())
我收到了这个错误:BotoClientError: s3不支持分块传输。
更新:我找到了一种直接写入S3的方法,但是我找不到一种在不删除已写入的行的情况下清空缓冲区的方法。因此,例如:
conn = boto.connect_s3()
bucket = conn.get_bucket('dev-vs')
k = Key(bucket)
k.key = 'foo/foobar'

testDict = [{
    "fieldA": "8",
    "fieldB": None,
    "fieldC": "888888888888"},
    {
    "fieldA": "9",
    "fieldB": None,
    "fieldC": "99999999999"}]

f = io.StringIO()
fieldnames = ['fieldA', 'fieldB', 'fieldC']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
k.set_contents_from_string(f.getvalue())

for row in testDict:
    writer.writerow(row)
    k.set_contents_from_string(f.getvalue())

f.close()

向文件写入3行内容,但是我无法释放内存以写入一个大文件。如果我添加:

f.seek(0)
f.truncate(0)

如果将文件打开到循环中,那么只有文件的最后一行被写入。有没有办法在不删除文件行的情况下释放资源?


即使你可以随意写入S3,由于一致性的挑战,我仍不建议这样做。为什么你认为本地写入不好?如果出现异常或问题,你想要一个部分的S3对象吗?我想不会。 - cgseller
3
我希望直接写入以提高效率。如果我在本地编写文件并上传,则需要额外添加上传步骤,并清理本地文件。我不介意有一个不完整的文件——如果我在本地编写它,也可能是不完整的。该系统是幂等的,会删除错误状态的文件或继续执行。 - inquiring minds
7个回答

57

我已经找到了解决我的问题的方法,并且在这里发布以供其他人参考。我决定将其作为多部分上传的一部分来完成。您无法直接流式传输到S3。但是有一个可用的软件包可以将您的流文件转换为多部分上传,我使用的是:Smart Open

import smart_open
import io
import csv

testDict = [{
    "fieldA": "8",
    "fieldB": None,
    "fieldC": "888888888888"},
    {
    "fieldA": "9",
    "fieldB": None,
    "fieldC": "99999999999"}]

fieldnames = ['fieldA', 'fieldB', 'fieldC']
f = io.StringIO()
with smart_open.smart_open('s3://dev-test/bar/foo.csv', 'wb') as fout:
    writer = csv.DictWriter(f, fieldnames=fieldnames)
    writer.writeheader()
    fout.write(f.getvalue())

    for row in testDict:
        f.seek(0)
        f.truncate(0)
        writer.writerow(row)
        fout.write(f.getvalue())

f.close()

对于Python 2,请确保使用StringIO.StringIO()而不是io.StringIO(),否则您将收到编码错误。 - Anconia
@有疑问的朋友,这是一个好答案。我的用例与你的几乎相同,只是不同之处在于我想生成XML而不是CSV。由于我喜欢使用像Mako/genshi这样的模板选项来生成XML,你能否建议我如何处理它?(同时生成和写入,而不是先本地写入) - Ahsanul Haque
StringIO不需要二进制模式,将模式从'wb'更改为'w'解决了我的问题。 - Debodirno Chandra

2
根据文档,这是可能的。
s3.Object('mybucket', 'hello.txt').put(Body=open('/tmp/hello.txt', 'rb'))

因此,我们可以以普通方式使用StringIO

更新:@inquiring minds的答案中的smart_open库是更好的解决方案


8
我不明白如何使用这个。 /tmp/hello.txt 不是一个本地文件吗?这正是我们要避免的。 - EthanP
1
不,根据这个问题单,它不被支持。 使用S3流的想法是为了在需要上传几个GB的大文件时避免使用静态文件。我也正在尝试解决这个问题 - 我需要从mongodb中读取大量数据并将其放入S3,我不想使用文件。 - baldr
@baldr 嗯,这个技巧在过去对我有效。顺便说一下,在你的消息中提到的那个票据中,我看到另一个有用的方法。不幸的是,我现在不再与亚马逊合作,无法测试它。 - El Ruso
3
我尝试深入研究 boto 的源代码,发现它需要为每个发送的文件计算 MD5 校验和。这意味着流至少应该是“可定位的”。由于我从mongodb中读取数据并且无法轻松地倒回数据流,因此我有非可定位的流。这里推荐使用的 smart_open 允许使用流,但它只使用内部缓冲区,然后也使用 boto 进行“分块上传”。从技术上讲,可以使用类似文件的流,但是要准备好可能需要大量内存的情况。使用流的想法是使用低内存上传(可能是)无限的数据流。 - baldr
@el-ruso,是的,这正是smart_open的工作方式。而且似乎这是上传这些文件的唯一方法。您通过较小的块上传大文件。我不会称其为“流式上传”,只是“分块上传”。 - baldr
显示剩余2条评论

1
我们试图将文件内容上传到s3,但是在Django请求中以InMemoryUploadedFile对象的形式出现。我们最终采取了以下方法,因为我们不想在本地保存文件。希望这可以帮到你:
@action(detail=False, methods=['post'])
def upload_document(self, request):
     document = request.data.get('image').file
     s3.upload_fileobj(document, BUCKET_NAME, 
                                 DESIRED_NAME_OF_FILE_IN_S3, 
                                 ExtraArgs={"ServerSideEncryption": "aws:kms"})

1
虽然这种方法可行,但它并不意味着流式传输 - 因为InMemoryUploadedFile会将整个文件保存在RAM中。内存中的文件相对较小 - 它们不是即时生成的。 - Eugene

0

这里是一个完整的使用 boto3 的示例:

import boto3
import io

session = boto3.Session(
    aws_access_key_id="...",
    aws_secret_access_key="..."
)

s3 = session.resource("s3")

buff = io.BytesIO()

buff.write("test1\n".encode())
buff.write("test2\n".encode())

s3.Object(bucket, keypath).put(Body=buff.getvalue())

2
我给你点了踩是因为buff.getvalue()显然不是一个流,而是一个bytes对象。详见https://docs.python.org/3/library/io.html#io.BytesIO.getvalue - mdurant

0

有一个广泛支持的库可以做到这一点:

pip install s3fs

s3fs非常容易使用:

import s3fs

s3fs.S3FileSystem(anon=False)

with s3.open('mybucket/new-file', 'wb') as f:
    f.write(2*2**20 * b'a')
    f.write(2*2**20 * b'a')

顺便提一下,boto3(由AWS API支持)中还有一个名为MultiPartUpload的东西。

这不是作为Python流来分解的,这对某些人可能是一个优势。相反,您可以开始上传并逐个发送部分。


0
在 GitHub 的一个 smart_open 问题(#82)中提到了一个有趣的代码解决方案,我一直想尝试一下。为了记录,复制粘贴在这里...看起来需要 boto3
csv_data = io.BytesIO()
writer = csv.writer(csv_data)
writer.writerows(my_data)

gz_stream = io.BytesIO()
with gzip.GzipFile(fileobj=gz_stream, mode="w") as gz:
    gz.write(csv_data.getvalue())
gz_stream.seek(0)

s3 = boto3.client('s3')
s3.upload_fileobj(gz_stream, bucket_name, key)

这个特定的例子正在将流数据传输到一个压缩的 S3 键/文件,但似乎通用的方法是使用 boto3 S3 客户端的 upload_fileobj() 方法与目标流一起使用,而不是文件。


你能解释一下这里的my_data是什么吗?它是一个列表还是字典? - User1011
根据这个StackOverflow的答案,writer.writerows()将可迭代的可迭代对象——如列表、数组等——作为输入:https://dev59.com/ElwY5IYBdhLWcg3wCUA7#33092057 - Mass Dot Net

-4

要将字符串写入S3对象,请使用:

s3.Object('my_bucket', 'my_file.txt').put('Hello there')

所以将流转换为字符串,你就完成了。


只有当对象的大小不超过内存限制时,这才能正常工作。 - erik258

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接