如何在Django Channels中使用多线程AsyncConsumer?

5

我最近一周一直在使用Django Channels,但是我对runworker并行性不满意。

例如,我有一个MQTT客户端,当它接收到消息时会发布channels,很基础的功能。

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

这是发送成功了。无论我想要发送多少,它都会被发送到redis队列中。发送到频道mqtt

然后我运行工作程序,该程序将通过以下方式重新定向队列中的消息以供mqtt使用:

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']

这是问题的起点。以下是AsyncConsumer读取数据的内容:
class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

我设置了一个sleep以模拟任务的业务。这就是我的问题所在:异步消费者不是多线程的!当我向通道发送两条消息时,消费者处理第二条消息需要10秒,而不是如果它是多线程则只需要5秒。如下所示。

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

任何与该主题有关的情报都将是极大的帮助,提前感谢!

编辑: 我发现唯一管理它的方法是创建一个执行器,其中包含工人来异步完成任务。但我不确定它在部署目的上的效率。

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)
1个回答

0

这是目前的设计。

是的,这是预期的设计,因为这是最安全的方式(如果您不知道它,它可以防止竞态条件)。如果您愿意并行运行消息,只需在需要时启动自己的协程(使用 asyncio.create_task),确保在关闭时清理它们并等待它们。这会增加相当多的开销,所以希望我们将来会在消费者中提供一个选择加入模式,但现在我们只提供安全选项。

https://github.com/django/channels/issues/1203


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接