使用Asyncio同时运行Dash(或Flask)服务器和另一个协程

4
我创建了一个 Dash 应用程序来展示另一段代码收集的信息,我想使用 Python 中的 asyncio 模块同时运行它们。
我的代码使用异步函数,而基于 Flask 的 Dash 应用程序阻止任何其他东西在提供服务时执行。
我不确定这是否需要涉及打开更多线程。
这是我的当前代码,它只运行主协程。
async def main():
    some code here...

    while True:
        try:
            await client.handle_message()
        except ConnectionClosedError as error:
            logger.error(error)
    
        for strategy in strategies:
            await asyncio.create_task(...)
            some code here...

async def run_dashboard():
    app = create_app()
    app.run_server('0.0.0.0', 5000, debug=False)


if __name__ == '__main__':
    some code here...

    # Currently just runs the main coroutine
    asyncio.run(main())

我如何同时运行main和run_dashboard?


有任何反馈吗? - Artiom Kozyrev
4个回答

7
坦率地说,在一个进程中将Dash(Flask)与一些异步工作结合起来不是一个好的设计,考虑在不同的进程中运行Flask和异步活动(即应用程序)。
尽管如此,如果您仍然想在一个进程中运行所有内容,我可以给您提供以下的工作示例,请遵循注释并在有问题时提问:
from flask import Flask, jsonify
import asyncio
from threading import Thread

# ** Async Part **


async def some_print_task():
    """Some async function"""
    while True:
        await asyncio.sleep(2)
        print("Some Task")


async def another_task():
    """Another async function"""
    while True:
        await asyncio.sleep(3)
        print("Another Task")


async def async_main():
    """Main async function"""
    await asyncio.gather(some_print_task(), another_task())


def async_main_wrapper():
    """Not async Wrapper around async_main to run it as target function of Thread"""
    asyncio.run(async_main())

# *** Flask Part ***:


app = Flask(__name__)


@app.route("/", methods=["GET"])
def index():
    """just some function"""
    return jsonify({"hello": "world"})


if __name__ == '__main__':
    # run all async stuff in another thread
    th = Thread(target=async_main_wrapper)
    th.start()
    # run Flask server
    app.run(host="0.0.0.0", port=9999)
    th.join()

@orie 很高兴听到这个消息。 - Artiom Kozyrev
1
这对我起作用了 - 使用 Dash 作为应用程序而不是 Flask。 - pcrx20
2
你能详细说明为什么这是糟糕的设计吗?即,有哪些风险? - Sterling Butters
2
@SterlingButters 当时回答这个问题时,据我所知 Flask 没有对 asyncio 代码提供“本地”支持(我已经有1.5年没有使用 Flask 了),但他们在某种程度上添加了支持 https://flask.palletsprojects.com/en/2.2.x/async-await/。那么为什么不好呢?因为 asyncio eventloop 只能在一个线程中工作。你可以在另一个线程中创建多个 asyncio eventloop,但这会降低应用程序的性能。同样的故事也出现在一个线程中的 asyncio + 在其他线程中处理 http 请求。你只会得到更差的性能。 - Artiom Kozyrev
1
@SterlingButters,因此应按以下方式使用asyncio http框架:1)一个线程中的一个事件循环将处理所有请求2)在Thread中读/写文件,因为它不是异步操作3)将所有CPU绑定任务移动到子进程中或使用消息队列(例如RabbitMQ)并将任务移动到其他微服务! - Artiom Kozyrev

1

这里有一些代码运行一个Dash应用程序(收集航班数据-由Jose Portilla-Udemy提供),以及线程运行Dash应用程序和一些异步任务。


from flask import Flask, jsonify
import asyncio
from threading import Thread


# Dash
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import requests
import plotly.graph_objects as go 


# ** Async Part **

async def some_print_task():
    """Some async function"""
    while True:
        await asyncio.sleep(2)
        print("Some Task")


async def another_task():
    """Another async function"""
    while True:
        await asyncio.sleep(3)
        print("Another Task")


async def async_main():
    """Main async function"""
    await asyncio.gather(some_print_task(), another_task())


def async_main_wrapper():
    """Not async Wrapper around async_main to run it as target function of Thread"""
    asyncio.run(async_main())


# *** Dash Part ***:
app = dash.Dash()

app.layout = html.Div([

    # html.Div([
    #     html.Iframe(src="https://www.flightradar24.com",
    #                 height=500,width=200)
    # ]),
    html.Div([
        html.Pre(id='counter-text',children='Active Flights Worldwide'),
        dcc.Graph(id='live-update-graph',style={'width':1200}),
        dcc.Interval(   id='interval-component',
                        interval=6000,
                        n_intervals=0)
    ])
])
counter_list = []

@app.callback(  Output('counter-text','children'),
                [Input('interval-component','n_intervals')])
def update_layout(n):
    url = "https://data-live.flightradar24.com/zones/fcgi/feed.js?faa=1&mlat=1&flarm=1&adsb=1&gnd=1&air=1&vehicles=1&estimated=1&stats=1"
    res = requests.get(url, headers={'User-Agent' : 'Mozilla/5.0'})  # A fake header is necessary to access the site
    data = res.json()
    counter = 0
    for element in data["stats"]["total"]:
        counter += data["stats"]["total"][element]

    counter_list.append(counter)
    return "Active flights Worldwide: {}".format(counter)

@app.callback(  Output('live-update-graph','figure'),
                [Input('interval-component','n_intervals')])
def update_graph(n):
    fig = go.Figure(data=[
            go.Scatter(x=list(range(len(counter_list))),
                        y=counter_list,
                        mode='lines+markers')
    ])

    return fig

if __name__ == '__main__':
    # run all async stuff in another thread
    th = Thread(target=async_main_wrapper)
    th.start()
    # run Flask server
    # app.run(host="0.0.0.0", port=9999)
    app.run_server(debug=True)
    th.join()


debug=True 时,如何防止 Dash 启动两个线程? - sobek

0
如果你真的想让它在一个进程中运行,以下方法对我有效:

from functools import partial
from threading import Thread    


partial_run = partial(app.run, host="0.0.0.0", port=5000, debug=True, use_reloader=False)
t = Thread(target=partial_run)
t.start()
asyncio.run(main())

-1
在后台线程中运行run_dashboard。请参考文档
async def run():
    await asyncio.gather(
        asyncio.to_thread(run_dashboard),
        main()
    )

asyncio.run(run())

请注意,asyncio.to_thread 是 Python 3.9 版本中的新函数。 对于早于 3.9 版本的 Python,请复制以下代码threads.py

我的操作好像没有生效。你能帮我检查一下我的帖子吗? - pcrx20
这个该怎么工作?run_dashboard是一个协程,必须要使用await。线程无法做到这一点。这需要将协程重新定义为常规函数。 - MisterMiyagi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接