我怎样才能实现与高速公路ASYNCIO一个互动的WebSocket客户端?(How can I im

2019-10-21 05:47发布

我试图实现使用一个高速公路的WebSocket / WAMP客户|蟒蛇和asyncio ,虽然它有些工作,也有已经躲避我的部分。

我真正要做的是在QT5 / QML实现WAMP,但是这似乎是在目前更简单的路径。

大多是从网上复制这简化了客户端做的工作。 当它读取时间服务onJoin发生。

我想要做的是从外部触发该读。

我已经采取了迂回的方法是运行asyncio在一个线程事件循环,然后通过套接字发送命令以触发读。 我至今无法找出其中把常规/协同程序,以便它可以从阅读器程序被发现。

我怀疑有一个更简单的方法去了解这一点,但我还没有找到它。 建议,欢迎。

#!/usr/bin/python3
try:
    import asyncio
except ImportError:
    ## Trollius >= 0.3 was renamed
    import trollius as asyncio

from autobahn.asyncio import wamp, websocket
import threading
import time
from socket import socketpair

rsock, wsock = socketpair()

def reader() :
    data = rsock.recv(100)
    print("Received:", data.decode())

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")



    @asyncio.coroutine
    def onJoin(self, details):
        print('joined')
        ## call a remote procedure
        ##
        try:
           now = yield from self.call(u'com.timeservice.now')
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))



    def onLeave(self, details):
        self.disconnect()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()



def start_aloop() :
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                    debug = False,
                    debug_wamp = False)
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
    loop.add_reader(rsock,reader)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = MyFrontendComponent

    ## 4) now enter the asyncio event loop
    print('starting thread')
    thread = threading.Thread(target=start_aloop)
    thread.start()
    time.sleep(5)
    print("IN MAIN")
    # emulate an outside call
    wsock.send(b'a byte string')

Answer 1:

您可以监听事件循环内的插座异步,使用loop.sock_accept 。 你可以只调用一个协同程序设置的插槽内onConnectonJoin

try:
    import asyncio
except ImportError:
    ## Trollius >= 0.3 was renamed
    import trollius as asyncio

from autobahn.asyncio import wamp, websocket
import socket

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")

    @asyncio.coroutine
    def setup_socket(self):
        # Create a non-blocking socket
        self.sock = socket.socket()
        self.sock.setblocking(0)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(('localhost', 8889))
        self.sock.listen(5)
        loop = asyncio.get_event_loop()
        # Wait for connections to come in. When one arrives,
        # call the time service and disconnect immediately.
        while True:
            conn, address = yield from loop.sock_accept(self.sock)
            yield from self.call_timeservice()
            conn.close()

    @asyncio.coroutine
    def onJoin(self, details):
        print('joined')
        # Setup our socket server
        asyncio.async(self.setup_socket())

        ## call a remote procedure
        ##
        yield from self.call_timeservice()

    @asyncio.coroutine
    def call_timeservice(self):
        try:
           now = yield from self.call(u'com.timeservice.now')
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))

    ... # The rest is the same


Answer 2:

感谢您的答复端午。 不太解决方案,我需要的,但它指出我朝着正确的方向发展。 是的,我希望有一个从外部触发客户端远程湄RPC调用。

我想出了,让我以通为特定呼叫字符串以下(虽然只有一个实现现在)

以下是我想出了,虽然我不知道它是多么优雅。

import asyncio
from autobahn.asyncio import wamp, websocket
import threading
import time
import socket


rsock, wsock = socket.socketpair()

class MyFrontendComponent(wamp.ApplicationSession):
    def onConnect(self):
        self.join(u"realm1")

    @asyncio.coroutine
    def setup_socket(self):
        # Create a non-blocking socket
        self.sock = rsock
        self.sock.setblocking(0)
        loop = asyncio.get_event_loop()
        # Wait for connections to come in. When one arrives,
        # call the time service and disconnect immediately.
        while True:
            rcmd = yield from loop.sock_recv(rsock,80)
            yield from self.call_service(rcmd.decode())

    @asyncio.coroutine
    def onJoin(self, details):
        # Setup our socket server
        asyncio.async(self.setup_socket())


    @asyncio.coroutine
    def call_service(self,rcmd):
        print(rcmd)
        try:
           now = yield from self.call(rcmd)
        except Exception as e:
           print("Error: {}".format(e))
        else:
           print("Current time from time service: {}".format(now))



    def onLeave(self, details):
        self.disconnect()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()



def start_aloop() :
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    transport_factory = websocket.WampWebSocketClientFactory(session_factory,
                    debug = False,
                    debug_wamp = False)
    coro = loop.create_connection(transport_factory, '127.0.0.1', 8080)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    session_factory = wamp.ApplicationSessionFactory()
    session_factory.session = MyFrontendComponent

    ## 4) now enter the asyncio event loop
    print('starting thread')
    thread = threading.Thread(target=start_aloop)
    thread.start()
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
    time.sleep(5)
    wsock.send(b'com.timeservice.now')
    time.sleep(5)
    wsock.send(b'com.timeservice.now')


文章来源: How can I implement an interactive websocket client with autobahn asyncio?