Read file line by line with asyncio

2019-03-11 02:29发布

I wish to read several log files as they are written and process their input with asyncio. The code will have to run on windows. From what I understand from searching around both stackoverflow and the web, asynchronous file I/O is tricky on most operating systems (select will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like. The most helpful answer would probably be one that describes what the "architecture" of a solution to this problem should look like, i.e. how different functions and coroutines should be called or scheduled.

The following gives me a generator that reads the files line by line (through polling, which is acceptable):

import time

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            time.sleep(POLL_INTERVAL)
            continue
        process_line(line)

With several files to monitor and process, this sort of code would require threads. I have modified it slightly to be more usable with asyncio:

import asyncio

def line_reader(f):
    while True:
        line = f.readline()
        if not line:
            yield from asyncio.sleep(POLL_INTERVAL)
            continue
        process_line(line)

This sort of works when I schedule it through the asyncio event loop, but if process_data blocks, then that is of course not good. When starting out, I imagined the solution would look something like

def process_data():
    ...
    while True:
        ...
        line = yield from line_reader()
        ...

but I could not figure out how to make that work (at least not without process_data managing quite a bit of state).

Any ideas on how I should structure this kind of code?

4条回答
男人必须洒脱
2楼-- · 2019-03-11 02:38

Using the aiofiles:

async with aiofiles.open('filename', mode='r') as f:
    async for line in f:
        print(line)

EDIT 1

As the @Jashandeep mentioned, you should care about blocking operations:

Another method is select and or epoll:

from select import select

files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1)

The timeout parameter is important here.

see: https://docs.python.org/3/library/select.html#select.select

EDIT 2

You can register a file for read/write with: loop.add_reader()

It uses internal EPOLL Handler inside the loop.

EDIT 3

But remember the Epoll will not work with regular files.

查看更多
我欲成王,谁敢阻挡
3楼-- · 2019-03-11 02:38

asyncio doesn't support file operations yet, sorry.

Thus it cannot help with your problem.

查看更多
狗以群分
4楼-- · 2019-03-11 02:48

From what I understand from searching around both stackoverflow and the web, asynchronous file I/O is tricky on most operating systems (select will not work as intended, for example). While I'm sure I could do this with other methods (e.g. threads), I though I would try out asyncio to see what it is like.

asyncio is select based on *nix systems under the hood, so you won't be able to do non-blocking file I/O without the use of threads. On Windows, asyncio can use IOCP, which supports non-blocking file I/O, but this isn't supported by asyncio.

Your code is fine, except you should do blocking I/O calls in threads, so that you don't block the event loop if the I/O is slow. Fortunately, it's really simple to off load work to threads using the loop.run_in_executor function.

First, setup a dedicated thread-pool for your I/O:

from concurrent.futures import ThreadPoolExecutor
io_pool_exc = ThreadPoolExecutor()

And then simply offload any blocking I/O calls to the executor:

...
line = yield from loop.run_in_executor(io_pool_exc, f.readline)
...
查看更多
姐就是有狂的资本
5楼-- · 2019-03-11 02:55

Your code structure looks good to me, the following code runs fine on my machine:

import asyncio

PERIOD = 0.5

@asyncio.coroutine
def readline(f):
    while True:
        data = f.readline()
        if data:
            return data
        yield from asyncio.sleep(PERIOD)

@asyncio.coroutine
def test():
    with open('test.txt') as f:
        while True:
            line = yield from readline(f)
            print('Got: {!r}'.format(line))

loop = asyncio.get_event_loop()
loop.run_until_complete(test())
查看更多
登录 后发表回答