I have a asyncio.Protocol
subclass receiving data from a server.
I am storing this data (each line, because the data is text) in a asyncio.Queue
.
import asyncio
q = asyncio.Queue()
class StreamProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
for message in data.decode().splitlines():
yield q.put(message.rstrip())
def connection_lost(self, exc):
self.loop.stop()
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
'127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
I want to have another coroutine responsible for consuming the data in the queue and processing it.
- Should this be a
asyncio.Task
? - What if the queue becomes empty because for a few seconds no data is received? How can I make sure my consumer doesn't stop (
run_until_complete
)? - Is there a cleaner way than using a global variable for my queue?
Yes, create it using asyncio.ensure_future or loop.create_task.
Simply use queue.get to wait until an item is available:
Yes, simply pass it as argument to the consumer coroutine and stream protocol:
Once the connection is closed, use queue.join to wait until the queue is empty.
Full example:
Alternatively, you can also use streams: