I am trying to copy over S3 large dataset (larger than RAM) using SQL alchemy. My constraints are:
- I need to use sqlalchemy
- I need to keep memory pressure at lowest
- I don't want to use the local filsystem as intermediary step to send data to s3
I just want to pipe data from a DB to S3 in a memory efficient way.
I can to do it normal with data sets (using below logic) but with larger dataset I hit a buffer issue.
The first problem I solved is that executing a query usually buffers the result in memory. I use the fetchmany() method.
engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
if not chunk:
break
On the other side, I have a StringIO buffer that I feed with the fetchmany data check. Then I send its content to s3.
from io import StringIO
import boto3
import csv
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
The problem I have is essentially a design issue, how do I make these parts work together. Is it even possible in the same runtime?
engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
chunk = result.fetchmany(10000)
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
if not chunk:
break
I can make it work for one cycle of fetchmany, but not several. Any idea?