I am making a job class by inheriting from asyncio.Future with some custom attributes, and expect the job instance functions like the original Future.
When I call job.set_result inside a coroutine, it raises a Future object is not initialized error, then I tried to get future initialized by call asyncio.ensure_future and the same error appears.
I tried more and find that the future is usually created by loop.create_future(), however, there is no options to create my custom future.
Below is an example, How can I get my custom future initialized?
import asyncio
from dataclasses import dataclass
@dataclass
class Job(asyncio.Future):
job_task: Callable
real_future: asyncio.Future = None
something: str = None
def schedule(self):
async def run():
res = await self.job_task()
self.set_result(res) # raise error, future not initialized
return res
self.real_future = asyncio.ensure_future(run())
async def main():
async def task():
await asyncio.sleep(1)
return 1
job = Job(task)
job.schedule()
await job
asyncio.run(main())