I've been using Tornado for a while now and I've encountered issues with slow timing (which I asked about in this question). One possible issue that was pointed out by a fellow user was that I was using regular open("..." , 'w')
to write to files in my co-routine and that this might be a blocking piece of code.
So my question is, is there a way to do non-blocking file IO in Tornado? I couldn't find anything in my research that fit my needs.
Move all of the code associated with file IO to separate functions decorated with run_on_executor.
import os
import io
from concurrent.futures import ThreadPoolExecutor
from PIL import Image
class UploadHandler(web.RequestHandler):
executor = ThreadPoolExecutor(max_workers=os.cpu_count())
@gen.coroutine
def post(self):
file = self.request.files['file'][0]
try:
thumbnail = yield self.make_thumbnail(file.body)
except OSError:
raise web.HTTPError(400, 'Cannot identify image file')
orig_id, thumb_id = yield [
gridfs.put(file.body, content_type=file.content_type),
gridfs.put(thumbnail, content_type='image/png')]
yield db.imgs.save({'orig': orig_id, 'thumb': thumb_id})
self.redirect('')
@run_on_executor
def make_thumbnail(self, content):
im = Image.open(io.BytesIO(content))
im.convert('RGB')
im.thumbnail((128, 128), Image.ANTIALIAS)
with io.BytesIO() as output:
im.save(output, 'PNG')
return output.getvalue()
I'm providing another answer because as it turns out, reading/writting the whole file in a separate thread does not work for large files. You cannot receive or send the full contents of a big file in one chunk, because you may not have enough memory.
For me, it was not trivial to find out how to block the reader/writer thread when the chunk processor in the ioloop's main thread is not able to keep up with the speed. The implementation below works efficiently when the file read operation is much faster than the chunk processor, and also when the file read operation is the slower. Synchronization is realized by the combination of an async queue and a lock, and it does not block the ioloop's thread in any way.
The lock is only RELEASED in the loop's thread, it is never acquired, there is no race condition there.
I do not expect this to be accepted as an answer, but since it took me a while to figure out, I guess it may help others in their implementations.
This can be generalized not just for file read/write operations, but for any consumer/producer pair that has one side in a separate thread and the other side in the ioloop.
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.queues import Queue
def read_file(file_path, queue: Queue, io_loop: IOLoop, chunk_size: int = 64 * 1024):
file_size = os.path.getsize(file_path)
remaining = file_size
fin = open(file_path, "rb")
lock = threading.Lock()
def putter(chunk, lock: threading.Lock):
queue.put(chunk) # Called from the loop's thread -> can block
lock.release() # Awake reader thread after the chunk has been put into the processing queue
def put(chunk, lock):
"""Put the chunk into the queue, and wait until it is processed by the ioloop"""
lock.acquire() # Acquire in this thread
io_loop.spawn_callback(putter, chunk, lock) # Release in the loop's thread
lock.acquire() # Wait until the loop's thread has accepted the chunk for processing
lock.release() # Cleanup before return
# Put the file size into the queue without waiting
io_loop.spawn_callback(queue.put, file_size)
while remaining > 0:
chunk = fin.read(min(chunk_size, remaining))
print("read", chunk)
remaining -= len(chunk)
time.sleep(1) # Just for testing: simulate slow file reads.
put(chunk, lock)
# Put EOF/terminator into the queue
io_loop.spawn_callback(queue.put, None)
pool = ThreadPoolExecutor(3)
async def main():
# Create a queue for sending chunks of data
cq = Queue(maxsize=3)
# Start the reader thread that reads in a separate thread
pool.submit(read_file, __file__, cq, io_loop, 100)
file_size = await cq.get()
print("file size:", file_size)
# Process chunks
while True:
item = await cq.get()
# Terminator -> EOF
if item is None:
break
print("got chunk:", repr(item))
io_loop.stop()
if __name__ == '__main__':
io_loop = IOLoop.current()
io_loop.run_sync(main)
io_loop.start()