异步 IO 在协程中出现 RuntimeError: no running event loop

24

我正在编写多进程代码,它在Python 3.7中运行得非常完美。但是我想要其中一个并行进程使用AsyncIO执行IO操作并保持卡住状态,以获取更好的性能,但是我一直无法让它运行。

Ubuntu 18.04,Python 3.7,AsyncIO,pipenv(所有pip库已安装)

特别地,该方法使用多线程按预期运行,这就是我想用AsyncIO替换它的原因。

我已经搜索了Google,并尝试了在main()函数中进行循环以及仅在预期的协程中进行循环,看过示例并阅读了关于这种新的Async执行方式的文章,但迄今为止没有任何结果。

以下是执行的app.py代码:python app.py

import sys
import traceback
import logging
import asyncio

from config import DEBUG
from config import log_config
from <some-module> import <some-class>

if DEBUG:
    logging.config.dictConfig(log_config())
else:
    logging.basicConfig(
        level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)


def main():
    try:
        <some> = <some-class>([
            'some-data1.csv',
            'some-data2.csv'
            ])
        <some>.run()

    except:

        traceback.print_exc()
        pdb.post_mortem()

    sys.exit(0)


if __name__ == '__main__':

    asyncio.run(main())

这里是我定义给定类的代码

    _sql_client = SQLServer()
    _blob_client = BlockBlobStore()
    _keys = KeyVault()
    _data_source = _keys.fetch('some-data')
    #  Multiprocessing
    _manager = mp.Manager()
    _ns = _manager.Namespace()

    def __init__(self, list_of_collateral_files: list) -> None:

    @timeit
    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _calc_bid_per_path(self) -> None:

    @timeit
    def run(self) -> None:

包含异步代码的方法在此处:

    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

        all_files = self._blob_client.download_blobs(self._list_of_blob_files)

        _all_dfs = pd.DataFrame()
        async def read_task(file_: str) -> None:
            nonlocal _all_dfs
            df = pd.read_csv(StringIO(file_.content))
            _all_dfs = _all_dfs.append(df, sort=False)

        tasks = []
        loop = asyncio.new_event_loop()

        for file_ in all_files:
            tasks.append(asyncio.create_task(read_task(file_)))

        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

        _all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
        ns.dfs = _all_dfs

调用特定序列和这个异步方法的方法是:

    def run(self) -> None:
        extract = []
        extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
        extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
        extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))

        #  Start the parallel processes
        for process in extract:
            process.start()

        #  Await for database process to end
        extract[1].join()
        extract[2].join()

        #  Merge both database results
        self._merge_bids_with_hours(self._ns)

        extract[0].join()

        self._get_collaterial_per_month(self._ns)
        self._calc_bid_per_path()
        self._save_reports()
        self._upload_data()

我收到的错误消息如下:

Process Process-2:
Traceback (most recent call last):
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
    tasks.append(asyncio.create_task(read_task(file_)))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
  traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
  File "app.py", line 25, in main
    caiso.run()
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
    self._get_collaterial_per_month(self._ns)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
    credit_margin = ns.dfs
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)

3个回答

34
根据Traceback日志,看起来您正在尝试将任务添加到未运行的事件循环中。
/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '._get_filter_collateral..read_task' 从未被await
该事件循环刚刚被创建,尚未运行,因此无法将任务附加到它上面。
以下示例将重现相同的结果,即添加任务,然后尝试等待所有任务完成:
import asyncio
async def func(num):
    print('My name is func {0}...'.format(num))

loop = asyncio.get_event_loop()
tasks = list()
for i in range(5):
    tasks.append(asyncio.create_task(func(i)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

使用以下内容获得结果:

Traceback (most recent call last):
  File "C:/tmp/stack_overflow.py", line 42, in <module>
    tasks.append(asyncio.create_task(func(i)))
  File "C:\Users\Amiram\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 324, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'func' was never awaited

尽管如此,解决方案非常简单,您只需将任务添加到创建的循环中 - 而不是要求asyncio去做。 唯一需要更改的是以下行:

然而解决方案很简单,你只需要把任务添加到已创建的循环里面,而不是要求asyncio去执行。唯一需要更改的是下面这一行:

tasks.append(asyncio.create_task(func(i)))

将任务的创建方式从asyncio更改为新创建的loop,你可以这样做是因为这是你的循环,而不像asynio需要寻找正在运行的循环。

因此,新的代码行应该如下所示:

tasks.append(loop.create_task(func(i)))

另一个解决方案可能是运行一个异步函数,并在那里创建任务,例如(因为该循环已经在运行,现在asyncio可以将任务附加到它上面):

另一种解决方法是运行一个异步函数并在其中创建任务,例如(因为循环已经在运行,所以asyncio能够将任务附加到循环中):

import asyncio
async def func(num):
    print('Starting func {0}...'.format(num))
    await asyncio.sleep(0.1)
    print('Ending func {0}...'.format(num))

loop = asyncio.get_event_loop()
async def create_tasks_func():
    tasks = list()
    for i in range(5):
        tasks.append(asyncio.create_task(func(i)))
    await asyncio.wait(tasks)
loop.run_until_complete(create_tasks_func())
loop.close()

这个简单的更改会导致以下结果:

Starting func 0...
Starting func 1...
Starting func 2...
Starting func 3...
Starting func 4...
Ending func 0...
Ending func 2...
Ending func 4...
Ending func 1...
Ending func 3...

0

0
使用作为上面示例的替代方案
loop = asyncio.get_event_loop()
t = [loop.create_task(main()), loop.create_task(start())]
loop.run_until_complete(asyncio.wait(t))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接