MemoryError when Using the read() Method in Readin

2019-08-04 07:17发布

问题:

I'm trying to import a large size of JSON FILE from Amazon S3 into AWS RDS-PostgreSQL using Python. But, these errors occured,

Traceback (most recent call last):

File "my_code.py", line 67, in

file_content = obj['Body'].read().decode('utf-8').splitlines(True)

File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/response.py", line 76, in read

chunk = self._raw_stream.read(amt)

File "/home/user/asd-to-qwe/fgh-to-hjk/env/local/lib/python3.6/site-packages/botocore/vendored/requests/packages/urllib3/response.py", line 239, in read

data = self._fp.read()

File "/usr/lib64/python3.6/http/client.py", line 462, in read

s = self._safe_read(self.length)

File "/usr/lib64/python3.6/http/client.py", line 617, in _safe_read

return b"".join(s)

MemoryError

// my_code.py

import sys
import boto3
import psycopg2
import zipfile
import io
import json

s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()

bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)

def insert_query(data):
    query = """
        INSERT INTO data_table
        SELECT
            (src.test->>'url')::varchar, (src.test->>'id')::bigint,
            (src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
        FROM (SELECT CAST(%s AS JSONB) AS test) src
    """
    cursor.execute(query, (json.dumps(data),))


if key.endswith('.zip'):
    zip_files = obj['Body'].read()
    with io.BytesIO(zip_files) as zf:
        zf.seek(0)
        with zipfile.ZipFile(zf, mode='r') as z:
            for filename in z.namelist():
                with z.open(filename) as f:
                    for line in f:
                        insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
    file_content = obj['Body'].read().decode('utf-8').splitlines(True)
    for line in file_content:
        insert_query(json.loads(line))


connection.commit()
connection.close()

Are there any solutions to these problems? Any help would do, thank you so much!

回答1:

A significant savings can be had by avoiding slurping your whole input file into memory as a list of lines.

Specifically, these lines are terrible on memory usage, in that they involve a peak memory usage of a bytes object the size of your whole file, plus a list of lines with the complete contents of the file as well:

file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:

For a 1 GB ASCII text file with 5 million lines, on 64 bit Python 3.3+, that's a peak memory requirement of roughly 2.3 GB for just the bytes object, the list, and the individual strs in the list. A program that needs 2.3x as much RAM as the size of the files it processes won't scale to large files.

To fix, change that original code to:

file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:

Given that obj['Body'] appears to be usable for lazy streaming this should remove both copies of the complete file data from memory. Using TextIOWrapper means obj['Body'] is lazily read and decoded in chunks (of a few KB at a time), and the lines are iterated lazily as well; this reduces memory demands to a small, largely fixed amount (the peak memory cost would depend on the length of the longest line), regardless of file size.

Update:

It looks like StreamingBody doesn't implement the io.BufferedIOBase ABC. It does have its own documented API though, that can be used for a similar purpose. If you can't make the TextIOWrapper do the work for you (it's much more efficient and simple if it can be made to work), an alternative would be to do:

file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:

Unlike using TextIOWrapper, it doesn't benefit from bulk decoding of blocks (each line is decoded individually), but otherwise it should still achieve the same benefits in terms of reduced memory usage.