所以这里是代码,允许您像打开普通文件一样打开Amazon S3上的文件。请注意,我使用
aws
命令,而不是
boto3
Python模块。(我无法访问boto3。)您可以打开文件并在其上查找。文件会在本地缓存。如果您使用Python ZipFile API打开文件并且它是ZipFile,则可以读取各个部分。但是,您无法进行写入,因为S3不支持部分写入。
另外,我实现了s3open()
,它可以打开要读取或写入的文件,但它没有实现ZipFile
所需的查找接口。
from urllib.parse import urlparse
from subprocess import run,Popen,PIPE
import copy
import json
import os
import tempfile
def s3open(path, mode="r", encoding=None):
"""
Open an s3 file for reading or writing. Can handle any size, but cannot seek.
We could use boto.
http://boto.cloudhackers.com/en/latest/s3_tut.html
but it is easier to use the aws cli, since it is present and more likely to work.
"""
from subprocess import run,PIPE,Popen
if "b" in mode:
assert encoding == None
else:
if encoding==None:
encoding="utf-8"
assert 'a' not in mode
assert '+' not in mode
if "r" in mode:
p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
return p.stdout
elif "w" in mode:
p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
return p.stdin
else:
raise RuntimeError("invalid mode:{}".format(mode))
CACHE_SIZE=4096
MAX_READ=65536*16
debug=False
class S3File:
"""Open an S3 file that can be seeked. This is done by caching to the local file system."""
def __init__(self,name,mode='rb'):
self.name = name
self.url = urlparse(name)
if self.url.scheme != 's3':
raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
self.bucket = self.url.netloc
self.key = self.url.path[1:]
self.fpos = 0
self.tf = tempfile.NamedTemporaryFile()
cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
file_info = data['Contents'][0]
self.length = file_info['Size']
self.ETag = file_info['ETag']
self.frontcache = self._readrange(0,CACHE_SIZE)
if self.length > CACHE_SIZE:
self.backcache_start = self.length-CACHE_SIZE
if debug: print("backcache starts at {}".format(self.backcache_start))
self.backcache = self._readrange(self.backcache_start,CACHE_SIZE)
else:
self.backcache = None
def _readrange(self,start,length):
cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
'--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
if debug:print(cmd)
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
if debug:print(data)
self.tf.seek(0)
return self.tf.read(length)
def __repr__(self):
return "FakeFile<name:{} url:{}>".format(self.name,self.url)
def read(self,length=-1):
if length==-1:
length = min(MAX_READ, self.length - self.fpos + 1)
if debug:
print("read: fpos={} length={}".format(self.fpos,length))
if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
if debug:print("front cache")
buf = self.frontcache[self.fpos:self.fpos+length]
self.fpos += len(buf)
if debug:print("return 1: buf=",buf)
return buf
if self.backcache and (self.length - CACHE_SIZE < self.fpos):
if debug:print("back cache")
buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
self.fpos += len(buf)
if debug:print("return 2: buf=",buf)
return buf
buf = self._readrange(self.fpos, length)
self.fpos += len(buf)
if debug:print("return 3: buf=",buf)
return buf
def seek(self,offset,whence=0):
if debug:print("seek({},{})".format(offset,whence))
if whence==0:
self.fpos = offset
elif whence==1:
self.fpos += offset
elif whence==2:
self.fpos = self.length + offset
else:
raise RuntimeError("whence={}".format(whence))
if debug:print(" ={} (self.length={})".format(self.fpos,self.length))
def tell(self):
return self.fpos
def write(self):
raise RuntimeError("Write not supported")
def flush(self):
raise RuntimeError("Flush not supported")
def close(self):
return
ZipFile
除了STORE
和DEFLATE
之外不支持任何其他压缩方法(根据ZipFile
文档(compression: ZIP_STORED(无压缩)或ZIP_DEFLATED(需要zlib)
)以及实现;事实上,我基于/usr/lib/python2.7/zipfile.py
中的ZipFile
源代码实现了我的解压逻辑:))(对于错过你已经使用范围查询的事实感到抱歉;看来在发布我的回答之前我没有仔细阅读你的回答!) - Janaka Bandara