In Tornado, how to do non-blocking file read/write

2019-06-03 08:41发布

问题:

I've been using Tornado for a while now and I've encountered issues with slow timing (which I asked about in this question). One possible issue that was pointed out by a fellow user was that I was using regular open("..." , 'w') to write to files in my co-routine and that this might be a blocking piece of code.

So my question is, is there a way to do non-blocking file IO in Tornado? I couldn't find anything in my research that fit my needs.

回答1:

Move all of the code associated with file IO to separate functions decorated with run_on_executor.

import os 
import io 
from concurrent.futures import ThreadPoolExecutor 
from PIL import Image 

class UploadHandler(web.RequestHandler): 
    executor = ThreadPoolExecutor(max_workers=os.cpu_count()) 

    @gen.coroutine 
    def post(self): 
        file = self.request.files['file'][0] 
        try: 
            thumbnail = yield self.make_thumbnail(file.body) 
        except OSError: 
            raise web.HTTPError(400, 'Cannot identify image file') 
        orig_id, thumb_id = yield [ 
            gridfs.put(file.body, content_type=file.content_type), 
            gridfs.put(thumbnail, content_type='image/png')] 
        yield db.imgs.save({'orig': orig_id, 'thumb': thumb_id}) 
        self.redirect('') 

    @run_on_executor 
    def make_thumbnail(self, content): 
        im = Image.open(io.BytesIO(content)) 
        im.convert('RGB') 
        im.thumbnail((128, 128), Image.ANTIALIAS) 
        with io.BytesIO() as output: 
            im.save(output, 'PNG') 
            return output.getvalue()


回答2:

I'm providing another answer because as it turns out, reading/writting the whole file in a separate thread does not work for large files. You cannot receive or send the full contents of a big file in one chunk, because you may not have enough memory.

For me, it was not trivial to find out how to block the reader/writer thread when the chunk processor in the ioloop's main thread is not able to keep up with the speed. The implementation below works efficiently when the file read operation is much faster than the chunk processor, and also when the file read operation is the slower. Synchronization is realized by the combination of an async queue and a lock, and it does not block the ioloop's thread in any way.

The lock is only RELEASED in the loop's thread, it is never acquired, there is no race condition there.

I do not expect this to be accepted as an answer, but since it took me a while to figure out, I guess it may help others in their implementations.

This can be generalized not just for file read/write operations, but for any consumer/producer pair that has one side in a separate thread and the other side in the ioloop.

import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
from tornado.ioloop import IOLoop
from tornado.queues import Queue


def read_file(file_path, queue: Queue, io_loop: IOLoop, chunk_size: int = 64 * 1024):
    file_size = os.path.getsize(file_path)
    remaining = file_size
    fin = open(file_path, "rb")
    lock = threading.Lock()

    def putter(chunk, lock: threading.Lock):
        queue.put(chunk)        # Called from the loop's thread -> can block
        lock.release()          # Awake reader thread after the chunk has been put into the processing queue

    def put(chunk, lock):
        """Put the chunk into the queue, and wait until it is processed by the ioloop"""
        lock.acquire()  # Acquire in this thread
        io_loop.spawn_callback(putter, chunk, lock) # Release in the loop's thread
        lock.acquire()  # Wait until the loop's thread has accepted the chunk for processing
        lock.release()  # Cleanup before return

    # Put the file size into the queue without waiting
    io_loop.spawn_callback(queue.put, file_size)

    while remaining > 0:
        chunk = fin.read(min(chunk_size, remaining))
        print("read", chunk)
        remaining -= len(chunk)
        time.sleep(1)  # Just for testing: simulate slow file reads.
        put(chunk, lock)

    # Put EOF/terminator into the queue
    io_loop.spawn_callback(queue.put, None)


pool = ThreadPoolExecutor(3)


async def main():
    # Create a queue for sending chunks of data
    cq = Queue(maxsize=3)
    # Start the reader thread that reads in a separate thread
    pool.submit(read_file, __file__, cq, io_loop, 100)
    file_size = await cq.get()
    print("file size:", file_size)
    # Process chunks
    while True:
        item = await cq.get()
        # Terminator -> EOF
        if item is None:
            break
        print("got chunk:", repr(item))

    io_loop.stop()


if __name__ == '__main__':
    io_loop = IOLoop.current()
    io_loop.run_sync(main)
    io_loop.start()