Running several ApplicationSessions non-blockingly

2020-03-24 23:43发布

I'm trying to run two autobahn.asyncio.wamp.ApplicationSessions in python at the same time. Previously, I did this using a modification of the autobahn library as suggested in this post's answer. I now require a bit more professional solution.

After googling about for a while, this post appeared quite promising, but uses the twisted library, instead of asyncio. I wasn't able to identify a similar solution for the asyncio branch of the autobahn library, since it doesn't appear to be using Reactors.

The main problem I have, is that ApplicationRunner.run() is blocking (which is why I previously outsourced it to a thread), so I can't just run a second ApplicationRunner after it.

I do need to access 2 websocket channels at the same time, which I cannot appear to do with a single ApplicationSession.

My Code so far:

from autobahn.asyncio.wamp import ApplicationSession
from autobahn.asyncio.wamp import ApplicationRunner
from asyncio import coroutine
import time


channel1 = 'BTC_LTC'
channel2 = 'BTC_XMR'

class LTCComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('LTCComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel1)
        except Exception as e:
            print("Could not subscribe to topic:", e)

class XMRComponent(ApplicationSession):

    def onConnect(self):
        self.join(self.config.realm)

    @coroutine
    def onJoin(self, details):
        def onTicker(*args, **kwargs):
            print('XMRComponent', args, kwargs)

        try:
            yield from self.subscribe(onTicker, channel2)
        except Exception as e:
            print("Could not subscribe to topic:", e)

def main():
    runner = ApplicationRunner("wss://api.poloniex.com:443", "realm1", extra={})
    runner.run(LTCComponent)
    runner.run(XMRComponent) # <- is not being called


if __name__ == "__main__":

    try:
        main()
    except KeyboardInterrupt:
        quit()

    except Exception as e:
        print(time.time(), e)

My knowledge of the autobahn library is limited, and I'm afraid the documentation isn't improving my situation much. Am I overlooking something here? A function, a parameter, which would enable me to either compound my components or run them both at once?

Perhaps a similar solution as provided here, which implements an alternative ApplicationRunner ?


Related Topics

Running two ApplicationSessions in twisted

Running Autobahn ApplicationRunner in Thread

Autobahn.wamp.ApplicationSession Source

Autobahn.wamp.Applicationrunner Source


As Requested, the Traceback from @stovfl's answer using multithreading code:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/home/nils/anaconda3/lib/python3.5/threading.py", line     914, in _bootstrap_inner
    self.run()
  File "/home/nils/git/tools/gemini_wss/t2.py", line 27, in run
    self.appRunner.run(self.__ApplicationSession)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 143,     in run
    transport_factory = WampWebSocketClientFactory(create,         url=self.url,                 serializers=self.serializers)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     319, in __init__
    WebSocketClientFactory.__init__(self, *args, **kwargs)
  File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-    0.14.1-py3.5.egg/autobahn/asyncio/websocket.py", line     268, in __init__
    self.loop = loop or asyncio.get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 626, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/home/nils/anaconda3/lib/python3.5/asyncio/events.py",     line 572, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-2'.
Exception in thread Thread-1:
**Same as in Thread-2**
...
RuntimeError: There is no current event loop in thread 'Thread-1'.

2条回答
beautiful°
2楼-- · 2020-03-25 00:12

Following the approach you linked for twisted I managed to get same behaviour with asyncio setting start_loop=False

import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

runner1 = ApplicationRunner(url, realm, extra={'cli_id': 1})
coro1 = runner1.run(MyApplicationSession, start_loop=False)

runner2 = ApplicationRunner(url, realm, extra={'cli_id': 2})
coro2 = runner2.run(MyApplicationSession, start_loop=False)

asyncio.get_event_loop().run_until_complete(coro1)
asyncio.get_event_loop().run_until_complete(coro2)
asyncio.get_event_loop().run_forever()

class MyApplicationSession(ApplicationSession):

    def __init__(self, cfg):
        super().__init__(cfg)
        self.cli_id = cfg.extra['cli_id']

   def onJoin(self, details):
        print("session attached", self.cli_id)
查看更多
我只想做你的唯一
3楼-- · 2020-03-25 00:15

As I see from the traceback, we only reach Step 2 of 4

From the asyncio docs:
This module provides infrastructure for writing single-threaded concurrent code using coroutines, multiplexing I/O access over sockets and other resources

So I drop my first proposal using multithreading.
I could imagin the following three options:

  1. Do it with multiprocessing instead of multithreading
  2. Do it with coroutine inside asyncio loop
  3. Switch between channels in def onJoin(self, details)

Second proposal, first option using multiprocessing.
I can start two asyncio loops, so appRunner.run(...) should work.

You can use one class ApplicationSession if the channel are the only different. If you need to pass different class ApplicationSession add it to the args=

class __ApplicationSession(ApplicationSession):
        # ...
        try:
            yield from self.subscribe(onTicker, self.config.extra['channel'])
        except Exception as e:
            # ...

import multiprocessing as mp
import time

def ApplicationRunner_process(realm, channel):
        appRunner = ApplicationRunner("wss://api.poloniex.com:443", realm, extra={'channel': channel})
        appRunner.run(__ApplicationSession)

if __name__ == "__main__":
    AppRun = [{'process':None, 'channel':'BTC_LTC'},
              {'process': None, 'channel': 'BTC_XMR'}]

    for app in AppRun:
        app['process'] =  mp.Process(target = ApplicationRunner_process, args = ('realm1', app['channel'] ))
        app['process'].start()
        time.sleep(0.1)

    AppRun[0]['process'].join()
    AppRun[1]['process'].join()
查看更多
登录 后发表回答