我正在通过继承asyncio.Future
类来创建一个作业类,并带有一些自定义属性,期望该作业实例的函数与原始Future相同。
当我在协程中调用job.set_result
时,它会引发Future对象未初始化错误
,然后我尝试通过调用asyncio.ensure_future
来初始化该Future,但是仍然出现相同的错误。
我尝试了更多,并发现通常通过loop.create_future()
来创建Future,但是没有选项可以创建我的自定义Future。
以下是一个示例,我该如何让我的自定义Future被初始化?
import asyncio
from dataclasses import dataclass
@dataclass
class Job(asyncio.Future):
job_task: Callable
real_future: asyncio.Future = None
something: str = None
def schedule(self):
async def run():
res = await self.job_task()
self.set_result(res) # raise error, future not initialized
return res
self.real_future = asyncio.ensure_future(run())
async def main():
async def task():
await asyncio.sleep(1)
return 1
job = Job(task)
job.schedule()
await job
asyncio.run(main())