Memory-efficient large dataset streaming to S3

2020-07-24 05:03发布

问题:

I am trying to copy over S3 large dataset (larger than RAM) using SQL alchemy. My constraints are:

  1. I need to use sqlalchemy
  2. I need to keep memory pressure at lowest
  3. I don't want to use the local filsystem as intermediary step to send data to s3

I just want to pipe data from a DB to S3 in a memory efficient way.

I can to do it normal with data sets (using below logic) but with larger dataset I hit a buffer issue.

The first problem I solved is that executing a query usually buffers the result in memory. I use the fetchmany() method.

engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)

results=engine.execute('SELECT * FROM tableX;')
while True:
  chunk = result.fetchmany(10000)
  if not chunk:
    break

On the other side, I have a StringIO buffer that I feed with the fetchmany data check. Then I send its content to s3.

from io import StringIO
import boto3
import csv

s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())

The problem I have is essentially a design issue, how do I make these parts work together. Is it even possible in the same runtime?

engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')

engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
    chunk = result.fetchmany(10000)
    csv_writer = csv.writer(csv_buffer, delimiter=';')
    csv_writer.writerows(chunk)
    s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
    if not chunk:
        break

I can make it work for one cycle of fetchmany, but not several. Any idea?

回答1:

I'm assuming that by "make these parts work together" you mean you want a single file in S3 instead of just parts? All you need to do is to create a file object that, when read, will issue a query for the next batch and buffer that. We can make use of python's generators:

def _generate_chunks(engine):
    with engine.begin() as conn:
        conn = conn.execution_options(stream_results=True)
        results = conn.execute("")
        while True:
            chunk = results.fetchmany(10000)
            if not chunk:
                break
            csv_buffer = StringIO()
            csv_writer = csv.writer(csv_buffer, delimiter=';')
            csv_writer.writerows(chunk)
            yield csv_buffer.getvalue().encode("utf-8")

This is a stream of chunks of your file, so all we need to do is to stitch these together (lazily, of course) into a file object:

class CombinedFile(io.RawIOBase):
    def __init__(self, strings):
        self._buffer = ""
        self._strings = iter(strings)

    def read(self, size=-1):
        if size < 0:
            return self.readall()
        if not self._buffer:
            try:
                self._buffer = next(self._strings)
            except StopIteration:
                pass
        if len(self._buffer) > size:
            ret, self._buffer = self._buffer[:size], self._buffer[size:]
        else:
            ret, self._buffer = self._buffer, b""
        return ret

chunks = _generate_chunks(engine)
file = CombinedFile(chunks)
upload_file_object_to_s3(file)

Streaming the file object to S3 is left as an exercise for the reader. (You can probably use put_object.)