I am getting used to asyncio and find the task handling quite nice, but it can be difficult to mix async libraries with traditional io libraries. The problem I am currently facing is how to properly decode an async StreamReader.
The simplest solution is to read()
chunks of byte strings, and then decode each chunk - see code below. (In my program, I wouldn't print each chunk, but decode it into a string and send it into another method for processing):
import asyncio
import aiohttp
async def get_data(port):
url = 'http://localhost:{}/'.format(port)
r = await aiohttp.get(url)
stream = r.content
while not stream.at_eof():
data = await stream.read(4)
print(data.decode('utf-8'))
This works fine, until there is a utf-8 character that is split between too chunks. For example if the response is b'M\xc3\xa4dchen mit Bi\xc3\x9f\n'
, then reading chunks of 3 will work, but chunks of 4 will not (as \xc3
and \x9f
are in different chunks and decoding the chunk ending with \xc3
will raise the following error:
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc3 in position 3: unexpected end of data
I looked at proper solutions to this problem, and at least in the blocking world, seems to be either io.TextIOWrapper or codecs.StreamReaderWriter (the differences of which are discussed in PEP 0400). However, both of these rely on typical blocking streams.
I spent 30 minutes searching for examples with asyncio and kept finding my decode() solution. Does anyone know of a better solution or is this a missing feature in python's asyncio?
For reference, here are the results from using the two "standard" decoders with async streams.
Using the codec stream reader:
r = yield from aiohttp.get(url)
decoder = codecs.getreader('utf-8')
stream = decoder(r.content)
Exception:
File "echo_client.py", line 13, in get_data
data = yield from stream.read(4)
File "/usr/lib/python3.5/codecs.py", line 497, in read
data = self.bytebuffer + newdata
TypeError: can't concat bytes to generator
(it calls read() directly, rather than yield from
or await
it)
I also tried wrapping stream with io.TextIOWrapper:
stream = TextIOWrapper(r.content)
But that leads to the following:
File "echo_client.py", line 10, in get_data
stream = TextIOWrapper(r.content)
AttributeError: 'FlowControlStreamReader' object has no attribute 'readable'
P.S. If you want a sample test case for this, please look at this gist. You can run it with python3.5 to reproduce the error. If you change the chunk size from 4 to 3 (or 30), it will work correctly.
EDIT
The accepted answer fixed this like a charm. Thanks! If someone else has this issue, here is a simple wrapper class I made to handle the decoding on a StreamReader:
import codecs
class DecodingStreamReader:
def __init__(self, stream, encoding='utf-8', errors='strict'):
self.stream = stream
self.decoder = codecs.getincrementaldecoder(encoding)(errors=errors)
async def read(self, n=-1):
data = await self.stream.read(n)
if isinstance(data, (bytes, bytearray)):
data = self.decoder.decode(data)
return data
def at_eof(self):
return self.stream.at_eof()
You can use an IncrementalDecoder:
With your example:
Output: