I have not found any solution for my problem. I need to create a python application with two thread, each of which is connected to a WAMP Router using autobahn library.
Follow I write the code of my experiment:
wampAddress = 'ws://172.17.3.139:8181/ws'
wampRealm = 's4t'
from threading import Thread
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks
class AutobahnMRS(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Sessio attached [Connect to WAMP Router]")
def onMessage(*args):
print args
try:
yield self.subscribe(onMessage, 'test')
print ("Subscribed to topic: test")
except Exception as e:
print("Exception:" +e)
class AutobahnIM(ApplicationSession):
@inlineCallbacks
def onJoin(self, details):
print("Sessio attached [Connect to WAMP Router]")
try:
yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO')
print ("Subscribed to topic: test")
except Exception as e:
print("Exception:" +e)
class ManageRemoteSystem:
def __init__(self):
self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)
def start(self):
self.runner.run(AutobahnMRS);
class InternalMessages:
def __init__(self):
self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)
def start(self):
self.runner.run(AutobahnIM);
#class S4tServer:
if __name__ == '__main__':
server = ManageRemoteSystem()
sendMessage = InternalMessages()
thread1 = Thread(target = server.start())
thread1.start()
thread1.join()
thread2 = Thread(target = sendMessage.start())
thread2.start()
thread2.join()
When I launch this python application only the thread1 is started and later when I kill the application (ctrl-c) the following error messages are shown:
Sessio attached [Connect to WAMP Router]
Subscribed to topic: test
^CTraceback (most recent call last):
File "test_pub.py", line 71, in <module>
p2 = multiprocessing.Process(target = server.start())
File "test_pub.py", line 50, in start
self.runner.run(AutobahnMRS);
File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run
reactor.run()
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run
self.startRunning(installSignalHandlers=installSignalHandlers)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning
ReactorBase.startRunning(self)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning
raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable
I need to implement in one application that has its functionalities and also it must has a system for communicate to a WAMP router with autobahn python library.
In other words, I need a solution able to communicate with a WAMP router, but in the same time this application doesn't must be blocked with the autobahn part (I think that the solution is to start two thread, one thread manages some functions and second thread manages the autobahn part).
With the schema that I proposed before, there is another problem, the need to send message, in a specific topic on the WAMP router, from the application part in the 'no autobahn thread', this functionality should be called through with a specific function without blocking the other functionalities.
I hope I have given all the details.
Thanks a lot for any response
--------------------------------EDIT---------------------------------
After some research I have implemented what I need for websocket protocol, the code is the follow:
# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
def __init__(self, factory):
self.factory = factory
def onOpen(self):
#log.debug("Client connected")
self.factory.protocol_instance = self
self.factory.base_client._connected_event.set()
class _WebSocketClientFactory(WebSocketClientFactory):
def __init__(self, *args, **kwargs):
WebSocketClientFactory.__init__(self, *args, **kwargs)
self.protocol_instance = None
self.base_client = None
def buildProtocol(self, addr):
return _WebSocketClientProtocol(self)
# ------ end twisted -------
lass BaseWBClient(object):
def __init__(self, websocket_settings):
#self.settings = websocket_settings
# instance to be set by the own factory
self.factory = None
# this event will be triggered on onOpen()
self._connected_event = threading.Event()
# queue to hold not yet dispatched messages
self._send_queue = Queue.Queue()
self._reactor_thread = None
def connect(self):
log.msg("Connecting to host:port")
self.factory = _WebSocketClientFactory(
"ws://host:port",
debug=True)
self.factory.base_client = self
c = connectWS(self.factory)
self._reactor_thread = threading.Thread(target=reactor.run,
args=(False,))
self._reactor_thread.daemon = True
self._reactor_thread.start()
def send_message(self, body):
if not self._check_connection():
return
log.msg("Queing send")
self._send_queue.put(body)
reactor.callFromThread(self._dispatch)
def _check_connection(self):
if not self._connected_event.wait(timeout=10):
log.err("Unable to connect to server")
self.close()
return False
return True
def _dispatch(self):
log.msg("Dispatching")
while True:
try:
body = self._send_queue.get(block=False)
except Queue.Empty:
break
self.factory.protocol_instance.sendMessage(body)
def close(self):
reactor.callFromThread(reactor.stop)
import time
def Ppippo(coda):
while True:
coda.send_message('YOOOOOOOO')
time.sleep(5)
if __name__ == '__main__':
ws_setting = {'host':'', 'port':}
client = BaseWBClient(ws_setting)
t1 = threading.Thread(client.connect())
t11 = threading.Thread(Ppippo(client))
t11.start()
t1.start()
The previous code work fine, but I need to translate this to operate on WAMP protocol insted websocket.
Does anyone know how I solve ?
The bad news is that Autobahn is using the Twisted main loop, so you can't run it in two threads at once.
The good news is that you don't need to run it in two threads to run two things, and two threads would be more complicated anyway.
The API to get started with multiple applications is a bit confusing, because you have two
ApplicationRunner
objects, and it looks at first glance that the way you run an application in autobahn is to callApplicationRunner.run
.However,
ApplicationRunner
is simply a convenience that wraps up the stuff that sets up the application and the stuff that runs the main loop; the real meat of the work happens inWampWebSocketClientFactory
.In order to achieve what you want, you just need to get rid of the threads, and run the main loop yourself, making the
ApplicationRunner
instances simply set up their applications.In order to achieve this, you'll need to change the last part of your program to do this: