使用aiohttp集成Autobahn|Python

3

我正在尝试将 web服务器集成到Crossbar+Autobahn系统架构中。

更详细地说,当服务器收到特定的API调用时,它必须向Crossbar路由器发布一条消息。 我在官方存储库中看到了此示例,但我不知道如何将其集成到我的应用程序中。

理想情况下,我希望能够做到这一点

# class SampleTaskController(object):
async def handle_get_request(self, request: web.Request) -> web.Response:
    self.publisher.publish('com.myapp.topic1', 'Hello World!')
    return web.HTTPOk()

其中selfSampleTaskController(object)的一个实例,它定义了Web服务器的所有路由处理程序。

def main(argv):
    cfg_path = "./task_cfg.json"
    if len(argv) > 1:
        cfg_path = argv[0]

    logging.basicConfig(level=logging.DEBUG,
                        format=LOG_FORMAT)

    loop = zmq.asyncio.ZMQEventLoop()
    asyncio.set_event_loop(loop)

    app = web.Application(loop=loop)
    with open(cfg_path, 'r') as f:
        task_cfg = json.load(f)
        task_cfg['__cfg_path'] = cfg_path
        controller = SampleTaskController(task_cfg)
        controller.restore()
        app['controller'] = controller

        controller.setup_routes(app)

        app.on_startup.append(controller.on_startup)
        app.on_cleanup.append(controller.on_cleanup)
        web.run_app(app,
                    host=task_cfg['webserver_address'],
                    port=task_cfg['webserver_port'])

注意,我正在使用zmq.asyncio.ZMQEventLoop,因为服务器还在监听zmq套接字,这是在controller.on_startup方法中配置的。
我尝试使用wampy将消息发布到Crossbar,而不是使用autobahn,它可以正常工作,但是autobahn订阅者无法正确解析消息。
# autobahn subscriber
class ClientSession(ApplicationSession):
    async def onJoin(self, details):

        self.log.info("Client session joined {details}", details=details)

        self.log.info("Connected:  {details}", details=details)

        self._ident = details.authid
        self._type = u'Python'

        self.log.info("Component ID is  {ident}", ident=self._ident)
        self.log.info("Component type is  {type}", type=self._type)

        # SUBSCRIBE

        def gen_on_something(thing):
            def on_something(counter, id, type):
                print('----------------------------')
                self.log.info("'on_{something}' event, counter value: {message}",something=thing, message=counter)
                self.log.info("from component {id} ({type})", id=id, type=type)
            return on_something

        await self.subscribe(gen_on_something('landscape'), 'landscape')
        await self.subscribe(gen_on_something('nature'), 'nature')

-

# wampy publisher
async def publish():
    router = Crossbar(config_path='./crossbar.json')
    logging.getLogger().debug(router.realm)
    logging.getLogger().debug(router.url)
    logging.getLogger().debug(router.port)

    client = Client(router=router)
    client.start()

    result = client.publish(topic="nature", message=0)
    logging.getLogger().debug(result)

使用此配置,订阅者可以接收已发布的消息,但在解析消息时会出现异常。
TypeError: on_something() got an unexpected keyword argument 'message'

2
你可以在全局实例化一个名为mySessionObjectClientSession 对象 (或将其放置在Web应用程序的app对象中), 然后实例化一个 ApplicationRunner, 并调用 run(make=mySessionObject, start_loop=False)。接下来你就可以在 Web 路由处理器内使用 session 对象发布事件了。我们可能需要将其作为一个示例。 - oberstet
@oberstet 我尝试了你建议的方法,但是当我尝试发布一条消息时,我遇到了一个“TransportLost”异常。代码要点 当我运行它时,Docker容器中运行默认的Crossabar图像,并且Autobahn|Python示例正常工作,因此不应该是环境问题。 - gmanzoli
@oberstet 我想我找到了你所提到的例子 https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/component/backend.py 但是我无法运行它,因为从pip安装的autobahn|python没有autobahn.asyncio.component模块。 - gmanzoli
组件尚未发布。它们只是已经合并:https://github.com/crossbario/autobahn-python/pull/872/files#diff-87b06272782923eeb1d608f4202ccbdd 应该是下一个版本的一部分。 无论如何,这个新的API与您遇到的问题无关。 - Yourstruly
1个回答

1

最近我尝试同时使用aiohttp和autobahn。我改编了crossbar文档中的示例(原本使用twisted),得到了以下代码:

import asyncio
import logging

from aiohttp import web
from aiohttp.web_exceptions import HTTPOk, HTTPInternalServerError
from autobahn.asyncio.component import Component

# Setup logging
logger = logging.getLogger(__name__)


class WebApplication(object):
    """
    A simple Web application that publishes an event every time the
    url "/" is visited.
    """

    count = 0

    def __init__(self, app, wamp_comp):
        self._app = app
        self._wamp = wamp_comp
        self._session = None  # "None" while we're disconnected from WAMP router

        # associate ourselves with WAMP session lifecycle
        self._wamp.on('join', self._initialize)
        self._wamp.on('leave', self._uninitialize)

        self._app.router.add_get('/', self._render_slash)

    def _initialize(self, session, details):
        logger.info("Connected to WAMP router (session: %s, details: %s)", session, details)
        self._session = session

    def _uninitialize(self, session, reason):
        logger.warning("Lost WAMP connection (session: %s, reason: %s)", session, reason)
        self._session = None

    async def _render_slash(self, request):
        if self._session is None:
            return HTTPInternalServerError(reason="No WAMP session")
        self.count += 1
        self._session.publish(u"com.myapp.request_served", self.count, count=self.count)
        return HTTPOk(text="Published to 'com.myapp.request_served'")


def main():
    REALM = "crossbardemo"
    BROKER_URI = "ws://wamp_broker:9091/ws"
    BIND_ADDR = "0.0.0.0"
    BIND_PORT = 8080

    logging.basicConfig(
        level='DEBUG',
        format='[%(asctime)s %(levelname)s %(name)s:%(lineno)d]: %(message)s')

    logger.info("Starting aiohttp backend at %s:%s...", BIND_ADDR, BIND_PORT)
    loop = asyncio.get_event_loop()

    component = Component(
        transports=BROKER_URI,
        realm=REALM,
    )
    component.start(loop=loop)

    # When not using run() we also must start logging ourselves.
    import txaio
    txaio.start_logging(level='info')

    app = web.Application(
        loop=loop)

    _ = WebApplication(app, component)

    web.run_app(app, host=BIND_ADDR, port=BIND_PORT)


if __name__ == '__main__':
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接