We have ZIP files that are 5-10GB in size. The typical ZIP file has 5-10 internal files, each 1-5 GB in size uncompressed.
I have a nice set of Python tools for reading these files. Basically, I can open a filename and if there is a ZIP file, the tools search in the ZIP file and then open the compressed file. It's all rather transparent.
I want to store these files in Amazon S3 as compressed files. I can fetch ranges of S3 files, so it should be possible to fetch the ZIP central directory (it's the end of the file, so I can just read the last 64KiB), find the component I want, download that, and stream directly to the calling process.
So my question is, how do I do that through the standard Python ZipFile API? It isn't documented how to replace the filesystem transport with an arbitrary object that supports POSIX semantics. Is this possible without rewriting the module?
Here's an approach which does not need to fetch the entire file (full version available here).
It does require boto
(or boto3
), though (unless you can mimic the ranged GET
s via AWS CLI; which I guess is quite possible as well).
import sys
import zlib
import zipfile
import io
import boto
from boto.s3.connection import OrdinaryCallingFormat
# range-fetches a S3 key
def fetch(key, start, len):
end = start + len - 1
return key.get_contents_as_string(headers={"Range": "bytes=%d-%d" % (start, end)})
# parses 2 or 4 little-endian bits into their corresponding integer value
def parse_int(bytes):
val = ord(bytes[0]) + (ord(bytes[1]) << 8)
if len(bytes) > 3:
val += (ord(bytes[2]) << 16) + (ord(bytes[3]) << 24)
return val
"""
bucket: name of the bucket
key: path to zipfile inside bucket
entry: pathname of zip entry to be retrieved (path/to/subdir/file.name)
"""
# OrdinaryCallingFormat prevents certificate errors on bucket names with dots
# https://stackoverflow.com/questions/51604689/read-zip-files-from-amazon-s3-using-boto3-and-python#51605244
_bucket = boto.connect_s3(calling_format=OrdinaryCallingFormat()).get_bucket(bucket)
_key = _bucket.get_key(key)
# fetch the last 22 bytes (end-of-central-directory record; assuming the comment field is empty)
size = _key.size
eocd = fetch(_key, size - 22, 22)
# start offset and size of the central directory
cd_start = parse_int(eocd[16:20])
cd_size = parse_int(eocd[12:16])
# fetch central directory, append EOCD, and open as zipfile!
cd = fetch(_key, cd_start, cd_size)
zip = zipfile.ZipFile(io.BytesIO(cd + eocd))
for zi in zip.filelist:
if zi.filename == entry:
# local file header starting at file name length + file content
# (so we can reliably skip file name and extra fields)
# in our "mock" zipfile, `header_offset`s are negative (probably because the leading content is missing)
# so we have to add to it the CD start offset (`cd_start`) to get the actual offset
file_head = fetch(_key, cd_start + zi.header_offset + 26, 4)
name_len = parse_int(file_head[0:2])
extra_len = parse_int(file_head[2:4])
content = fetch(_key, cd_start + zi.header_offset + 30 + name_len + extra_len, zi.compress_size)
# now `content` has the file entry you were looking for!
# you should probably decompress it in context before passing it to some other program
if zi.compress_type == zipfile.ZIP_DEFLATED:
print zlib.decompressobj(-15).decompress(content)
else:
print content
break
In your case you might need to write the fetched content to a local file (due to large size), unless memory usage is not a concern.
So here is the code that allows you to open a file on Amazon S3 as if it were a normal file. Notice I use the aws
command, rather than the boto3
Python module. (I don't have access to boto3.) You can open the file and seek on it. The file is cached locally. If you open the file with the Python ZipFile API and it's a ZipFile, you can then read individual parts. You can't write, though, because S3 doesn't support partial writes.
Separately, I implement s3open()
, which can open a file for reading or writing, but it doesn't implement the seek interface, which is required by ZipFile.
from urllib.parse import urlparse
from subprocess import run,Popen,PIPE
import copy
import json
import os
import tempfile
# Tools for reading and write files from Amazon S3 without boto or boto3
# http://boto.cloudhackers.com/en/latest/s3_tut.html
# but it is easier to use the aws cli, since it's configured to work.
def s3open(path, mode="r", encoding=None):
"""
Open an s3 file for reading or writing. Can handle any size, but cannot seek.
We could use boto.
http://boto.cloudhackers.com/en/latest/s3_tut.html
but it is easier to use the aws cli, since it is present and more likely to work.
"""
from subprocess import run,PIPE,Popen
if "b" in mode:
assert encoding == None
else:
if encoding==None:
encoding="utf-8"
assert 'a' not in mode
assert '+' not in mode
if "r" in mode:
p = Popen(['aws','s3','cp',path,'-'],stdout=PIPE,encoding=encoding)
return p.stdout
elif "w" in mode:
p = Popen(['aws','s3','cp','-',path],stdin=PIPE,encoding=encoding)
return p.stdin
else:
raise RuntimeError("invalid mode:{}".format(mode))
CACHE_SIZE=4096 # big enough for front and back caches
MAX_READ=65536*16
debug=False
class S3File:
"""Open an S3 file that can be seeked. This is done by caching to the local file system."""
def __init__(self,name,mode='rb'):
self.name = name
self.url = urlparse(name)
if self.url.scheme != 's3':
raise RuntimeError("url scheme is {}; expecting s3".format(self.url.scheme))
self.bucket = self.url.netloc
self.key = self.url.path[1:]
self.fpos = 0
self.tf = tempfile.NamedTemporaryFile()
cmd = ['aws','s3api','list-objects','--bucket',self.bucket,'--prefix',self.key,'--output','json']
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
file_info = data['Contents'][0]
self.length = file_info['Size']
self.ETag = file_info['ETag']
# Load the caches
self.frontcache = self._readrange(0,CACHE_SIZE) # read the first 1024 bytes and get length of the file
if self.length > CACHE_SIZE:
self.backcache_start = self.length-CACHE_SIZE
if debug: print("backcache starts at {}".format(self.backcache_start))
self.backcache = self._readrange(self.backcache_start,CACHE_SIZE)
else:
self.backcache = None
def _readrange(self,start,length):
# This is gross; we copy everything to the named temporary file, rather than a pipe
# because the pipes weren't showing up in /dev/fd/?
# We probably want to cache also... That's coming
cmd = ['aws','s3api','get-object','--bucket',self.bucket,'--key',self.key,'--output','json',
'--range','bytes={}-{}'.format(start,start+length-1),self.tf.name]
if debug:print(cmd)
data = json.loads(Popen(cmd,encoding='utf8',stdout=PIPE).communicate()[0])
if debug:print(data)
self.tf.seek(0) # go to the beginning of the data just read
return self.tf.read(length) # and read that much
def __repr__(self):
return "FakeFile<name:{} url:{}>".format(self.name,self.url)
def read(self,length=-1):
# If length==-1, figure out the max we can read to the end of the file
if length==-1:
length = min(MAX_READ, self.length - self.fpos + 1)
if debug:
print("read: fpos={} length={}".format(self.fpos,length))
# Can we satisfy from the front cache?
if self.fpos < CACHE_SIZE and self.fpos+length < CACHE_SIZE:
if debug:print("front cache")
buf = self.frontcache[self.fpos:self.fpos+length]
self.fpos += len(buf)
if debug:print("return 1: buf=",buf)
return buf
# Can we satisfy from the back cache?
if self.backcache and (self.length - CACHE_SIZE < self.fpos):
if debug:print("back cache")
buf = self.backcache[self.fpos - self.backcache_start:self.fpos - self.backcache_start + length]
self.fpos += len(buf)
if debug:print("return 2: buf=",buf)
return buf
buf = self._readrange(self.fpos, length)
self.fpos += len(buf)
if debug:print("return 3: buf=",buf)
return buf
def seek(self,offset,whence=0):
if debug:print("seek({},{})".format(offset,whence))
if whence==0:
self.fpos = offset
elif whence==1:
self.fpos += offset
elif whence==2:
self.fpos = self.length + offset
else:
raise RuntimeError("whence={}".format(whence))
if debug:print(" ={} (self.length={})".format(self.fpos,self.length))
def tell(self):
return self.fpos
def write(self):
raise RuntimeError("Write not supported")
def flush(self):
raise RuntimeError("Flush not supported")
def close(self):
return