diff options
| author | Adrian Herrmann <adrian.herrmann@qt.io> | 2024-05-30 14:05:58 +0200 |
|---|---|---|
| committer | Adrian Herrmann <adrian.herrmann@qt.io> | 2024-06-28 13:29:33 +0200 |
| commit | c73c656082a18893154525ec2a8474c0d421748e (patch) | |
| tree | 2f6835abe37686b7c62728164f479f81690367cd /sources/pyside6 | |
| parent | 3e9e1009b58b1a67cfdc4447367d1274e15cdafe (diff) | |
QtAsyncio: Use modern typing syntax
We can already use the modern typing syntax introduced with Python 3.10
in 3.9 via future statement definitions, even before we raise the
minimum Python version to 3.10.
Task-number: PYSIDE-769
Task-number: PYSIDE-2786
Change-Id: I560d0c25f3503217f920906a5b26193282b0247b
Reviewed-by: Christian Tismer <tismer@stackless.com>
Diffstat (limited to 'sources/pyside6')
| -rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/__init__.py | 11 | ||||
| -rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/events.py | 99 | ||||
| -rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/futures.py | 31 | ||||
| -rw-r--r-- | sources/pyside6/PySide6/QtAsyncio/tasks.py | 27 |
4 files changed, 81 insertions, 87 deletions
diff --git a/sources/pyside6/PySide6/QtAsyncio/__init__.py b/sources/pyside6/PySide6/QtAsyncio/__init__.py index 5e656753e..d284e4f6a 100644 --- a/sources/pyside6/PySide6/QtAsyncio/__init__.py +++ b/sources/pyside6/PySide6/QtAsyncio/__init__.py @@ -8,8 +8,9 @@ from .events import ( from .futures import QAsyncioFuture from .tasks import QAsyncioTask +from typing import Coroutine, Any + import asyncio -import typing __all__ = [ "QAsyncioEventLoopPolicy", "QAsyncioEventLoop", @@ -18,11 +19,9 @@ __all__ = [ ] -def run(coro: typing.Optional[typing.Coroutine] = None, - keep_running: bool = True, - quit_qapp: bool = True, *, - handle_sigint: bool = False, - debug: typing.Optional[bool] = None) -> typing.Any: +def run(coro: Coroutine | None = None, + keep_running: bool = True, quit_qapp: bool = True, *, handle_sigint: bool = False, + debug: bool | None = None) -> Any: """ Run the QtAsyncio event loop. diff --git a/sources/pyside6/PySide6/QtAsyncio/events.py b/sources/pyside6/PySide6/QtAsyncio/events.py index f104f96ea..57f1a91b1 100644 --- a/sources/pyside6/PySide6/QtAsyncio/events.py +++ b/sources/pyside6/PySide6/QtAsyncio/events.py @@ -8,6 +8,8 @@ from PySide6.QtCore import (QCoreApplication, QDateTime, QDeadlineTimer, from . import futures from . import tasks +from typing import Any, Callable + import asyncio import collections.abc import concurrent.futures @@ -17,7 +19,6 @@ import os import signal import socket import subprocess -import typing import warnings __all__ = [ @@ -41,7 +42,7 @@ class QAsyncioExecutorWrapper(QObject): the actual callable for the executor into this new event loop. """ - def __init__(self, func: typing.Callable, *args: typing.Tuple) -> None: + def __init__(self, func: Callable, *args: tuple) -> None: super().__init__() self._loop: QEventLoop self._func = func @@ -103,7 +104,7 @@ class QAsyncioEventLoopPolicy(asyncio.AbstractEventLoopPolicy): # this instance is shut down every time. self._quit_qapp = quit_qapp - self._event_loop: typing.Optional[asyncio.AbstractEventLoop] = None + self._event_loop: asyncio.AbstractEventLoop | None = None if handle_sigint: signal.signal(signal.SIGINT, signal.SIG_DFL) @@ -113,7 +114,7 @@ class QAsyncioEventLoopPolicy(asyncio.AbstractEventLoopPolicy): self._event_loop = QAsyncioEventLoop(self._application, quit_qapp=self._quit_qapp) return self._event_loop - def set_event_loop(self, loop: typing.Optional[asyncio.AbstractEventLoop]) -> None: + def set_event_loop(self, loop: asyncio.AbstractEventLoop | None) -> None: self._event_loop = loop def new_event_loop(self) -> asyncio.AbstractEventLoop: @@ -189,7 +190,7 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): self._quit_from_outside = False # A set of all asynchronous generators that are currently running. - self._asyncgens: typing.Set[collections.abc.AsyncGenerator] = set() + self._asyncgens: set[collections.abc.AsyncGenerator] = set() # Starting with Python 3.11, this must be an instance of # ThreadPoolExecutor. @@ -200,14 +201,14 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): # asynchonrous generator raises an exception when closed, and two, if # an exception is raised during the execution of a task. Currently, the # default exception handler just prints the exception to the console. - self._exception_handler: typing.Optional[typing.Callable] = self.default_exception_handler + self._exception_handler: Callable | None = self.default_exception_handler # The task factory, if set with set_task_factory(). Otherwise, a new # task is created with the QAsyncioTask constructor. - self._task_factory: typing.Optional[typing.Callable] = None + self._task_factory: Callable | None = None # The future that is currently being awaited with run_until_complete(). - self._future_to_complete: typing.Optional[futures.QAsyncioFuture] = None + self._future_to_complete: futures.QAsyncioFuture | None = None self._debug = bool(os.getenv("PYTHONASYNCIODEBUG", False)) @@ -226,7 +227,7 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): future.get_loop().stop() def run_until_complete(self, - future: futures.QAsyncioFuture) -> typing.Any: # type: ignore[override] + future: futures.QAsyncioFuture) -> Any: # type: ignore[override] if self.is_closed(): raise RuntimeError("Event loop is closed") if self.is_running(): @@ -320,7 +321,7 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): self._asyncgens.clear() async def shutdown_default_executor(self, # type: ignore[override] - timeout: typing.Union[int, float, None] = None) -> None: + timeout: int | float | None = None) -> None: shutdown_successful = False if timeout is not None: deadline_timer = QDeadlineTimer(int(timeout * 1000)) @@ -345,51 +346,46 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): # Scheduling callbacks - def _call_soon_impl(self, callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None, - is_threadsafe: typing.Optional[bool] = False) -> asyncio.Handle: + def _call_soon_impl(self, callback: Callable, *args: Any, + context: contextvars.Context | None = None, + is_threadsafe: bool | None = False) -> asyncio.Handle: return self._call_later_impl(0, callback, *args, context=context, is_threadsafe=is_threadsafe) - def call_soon(self, callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None) -> asyncio.Handle: + def call_soon(self, callback: Callable, *args: Any, + context: contextvars.Context | None = None) -> asyncio.Handle: return self._call_soon_impl(callback, *args, context=context, is_threadsafe=False) - def call_soon_threadsafe(self, callback: typing.Callable, *args: typing.Any, - context: - typing.Optional[contextvars.Context] = None) -> asyncio.Handle: + def call_soon_threadsafe(self, callback: Callable, *args: Any, + context: contextvars.Context | None = None) -> asyncio.Handle: if self.is_closed(): raise RuntimeError("Event loop is closed") if context is None: context = contextvars.copy_context() return self._call_soon_impl(callback, *args, context=context, is_threadsafe=True) - def _call_later_impl(self, delay: typing.Union[int, float], - callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None, - is_threadsafe: typing.Optional[bool] = False) -> asyncio.TimerHandle: + def _call_later_impl(self, delay: int | float, callback: Callable, *args: Any, + context: contextvars.Context | None = None, + is_threadsafe: bool | None = False) -> asyncio.TimerHandle: if not isinstance(delay, (int, float)): raise TypeError("delay must be an int or float") return self._call_at_impl(self.time() + delay, callback, *args, context=context, is_threadsafe=is_threadsafe) - def call_later(self, delay: typing.Union[int, float], - callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None) -> asyncio.TimerHandle: + def call_later(self, delay: int | float, callback: Callable, *args: Any, + context: contextvars.Context | None = None) -> asyncio.TimerHandle: return self._call_later_impl(delay, callback, *args, context=context, is_threadsafe=False) - def _call_at_impl(self, when: typing.Union[int, float], - callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None, - is_threadsafe: typing.Optional[bool] = False) -> asyncio.TimerHandle: + def _call_at_impl(self, when: int | float, callback: Callable, *args: Any, + context: contextvars.Context | None = None, + is_threadsafe: bool | None = False) -> asyncio.TimerHandle: """ All call_at() and call_later() methods map to this method. """ if not isinstance(when, (int, float)): raise TypeError("when must be an int or float") return QAsyncioTimerHandle(when, callback, args, self, context, is_threadsafe=is_threadsafe) - def call_at(self, when: typing.Union[int, float], - callback: typing.Callable, *args: typing.Any, - context: typing.Optional[contextvars.Context] = None) -> asyncio.TimerHandle: + def call_at(self, when: int | float, callback: Callable, *args: Any, + context: contextvars.Context | None = None) -> asyncio.TimerHandle: return self._call_at_impl(when, callback, *args, context=context, is_threadsafe=False) def time(self) -> float: @@ -401,9 +397,9 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): return futures.QAsyncioFuture(loop=self) def create_task(self, # type: ignore[override] - coro: typing.Union[collections.abc.Generator, collections.abc.Coroutine], - *, name: typing.Optional[str] = None, - context: typing.Optional[contextvars.Context] = None) -> tasks.QAsyncioTask: + coro: collections.abc.Generator | collections.abc.Coroutine, + *, name: str | None = None, + context: contextvars.Context | None = None) -> tasks.QAsyncioTask: if self._task_factory is None: task = tasks.QAsyncioTask(coro, loop=self, name=name, context=context) else: @@ -412,12 +408,12 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): return task - def set_task_factory(self, factory: typing.Optional[typing.Callable]) -> None: + def set_task_factory(self, factory: Callable | None) -> None: if factory is not None and not callable(factory): raise TypeError("The task factory must be a callable or None") self._task_factory = factory - def get_task_factory(self) -> typing.Optional[typing.Callable]: + def get_task_factory(self) -> Callable | None: return self._task_factory # Opening network connections @@ -561,8 +557,8 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): # Executing code in thread or process pools def run_in_executor(self, - executor: typing.Optional[concurrent.futures.ThreadPoolExecutor], - func: typing.Callable, *args: typing.Tuple) -> asyncio.futures.Future: + executor: concurrent.futures.ThreadPoolExecutor | None, + func: Callable, *args: tuple) -> asyncio.futures.Future: if self.is_closed(): raise RuntimeError("Event loop is closed") if executor is None: @@ -580,28 +576,27 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject): ) def set_default_executor(self, - executor: typing.Optional[ - concurrent.futures.ThreadPoolExecutor]) -> None: + executor: concurrent.futures.ThreadPoolExecutor | None) -> None: if not isinstance(executor, concurrent.futures.ThreadPoolExecutor): raise TypeError("The executor must be a ThreadPoolExecutor") self._default_executor = executor # Error Handling API - def set_exception_handler(self, handler: typing.Optional[typing.Callable]) -> None: + def set_exception_handler(self, handler: Callable | None) -> None: if handler is not None and not callable(handler): raise TypeError("The handler must be a callable or None") self._exception_handler = handler - def get_exception_handler(self) -> typing.Optional[typing.Callable]: + def get_exception_handler(self) -> Callable | None: return self._exception_handler - def default_exception_handler(self, context: typing.Dict[str, typing.Any]) -> None: + def default_exception_handler(self, context: dict[str, Any]) -> None: # TODO if context["message"]: print(context["message"]) - def call_exception_handler(self, context: typing.Dict[str, typing.Any]) -> None: + def call_exception_handler(self, context: dict[str, Any]) -> None: if self._exception_handler is not None: self._exception_handler(context) @@ -644,9 +639,9 @@ class QAsyncioHandle(): CANCELLED = enum.auto() DONE = enum.auto() - def __init__(self, callback: typing.Callable, args: typing.Tuple, - loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context], - is_threadsafe: typing.Optional[bool] = False) -> None: + def __init__(self, callback: Callable, args: tuple, + loop: QAsyncioEventLoop, context: contextvars.Context | None, + is_threadsafe: bool | None = False) -> None: self._callback = callback self._args = args self._loop = loop @@ -661,7 +656,7 @@ class QAsyncioHandle(): def _start(self) -> None: self._schedule_event(self._timeout, lambda: self._cb()) - def _schedule_event(self, timeout: int, func: typing.Callable) -> None: + def _schedule_event(self, timeout: int, func: Callable) -> None: # Do not schedule events from asyncio when the app is quit from outside # the event loop, as this would cause events to be enqueued after the # event loop was destroyed. @@ -701,9 +696,9 @@ class QAsyncioHandle(): class QAsyncioTimerHandle(QAsyncioHandle, asyncio.TimerHandle): - def __init__(self, when: float, callback: typing.Callable, args: typing.Tuple, - loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context], - is_threadsafe: typing.Optional[bool] = False) -> None: + def __init__(self, when: float, callback: Callable, args: tuple, + loop: QAsyncioEventLoop, context: contextvars.Context | None, + is_threadsafe: bool | None = False) -> None: QAsyncioHandle.__init__(self, callback, args, loop, context, is_threadsafe) self._when = when diff --git a/sources/pyside6/PySide6/QtAsyncio/futures.py b/sources/pyside6/PySide6/QtAsyncio/futures.py index 29d86ad3c..6b4415490 100644 --- a/sources/pyside6/PySide6/QtAsyncio/futures.py +++ b/sources/pyside6/PySide6/QtAsyncio/futures.py @@ -4,10 +4,11 @@ from __future__ import annotations from . import events +from typing import Any, Callable + import asyncio import contextvars import enum -import typing class QAsyncioFuture(): @@ -24,8 +25,8 @@ class QAsyncioFuture(): DONE_WITH_RESULT = enum.auto() DONE_WITH_EXCEPTION = enum.auto() - def __init__(self, *, loop: typing.Optional["events.QAsyncioEventLoop"] = None, - context: typing.Optional[contextvars.Context] = None) -> None: + def __init__(self, *, loop: "events.QAsyncioEventLoop | None" = None, + context: contextvars.Context | None = None) -> None: self._loop: "events.QAsyncioEventLoop" if loop is None: self._loop = asyncio.events.get_event_loop() # type: ignore[assignment] @@ -34,13 +35,13 @@ class QAsyncioFuture(): self._context = context self._state = QAsyncioFuture.FutureState.PENDING - self._result: typing.Any = None - self._exception: typing.Optional[BaseException] = None + self._result: Any = None + self._exception: BaseException | None = None - self._cancel_message: typing.Optional[str] = None + self._cancel_message: str | None = None # List of callbacks that are called when the future is done. - self._callbacks: typing.List[typing.Callable] = list() + self._callbacks: list[Callable] = list() def __await__(self): if not self.done(): @@ -52,13 +53,13 @@ class QAsyncioFuture(): __iter__ = __await__ - def _schedule_callbacks(self, context: typing.Optional[contextvars.Context] = None): + def _schedule_callbacks(self, context: contextvars.Context | None = None): """ A future can optionally have callbacks that are called when the future is done. """ for cb in self._callbacks: self._loop.call_soon( cb, self, context=context if context else self._context) - def result(self) -> typing.Union[typing.Any, Exception]: + def result(self) -> Any | Exception: if self._state == QAsyncioFuture.FutureState.DONE_WITH_RESULT: return self._result if self._state == QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION and self._exception: @@ -70,7 +71,7 @@ class QAsyncioFuture(): raise asyncio.CancelledError raise asyncio.InvalidStateError - def set_result(self, result: typing.Any) -> None: + def set_result(self, result: Any) -> None: self._result = result self._state = QAsyncioFuture.FutureState.DONE_WITH_RESULT self._schedule_callbacks() @@ -86,20 +87,20 @@ class QAsyncioFuture(): def cancelled(self) -> bool: return self._state == QAsyncioFuture.FutureState.CANCELLED - def add_done_callback(self, cb: typing.Callable, *, - context: typing.Optional[contextvars.Context] = None) -> None: + def add_done_callback(self, cb: Callable, *, + context: contextvars.Context | None = None) -> None: if self.done(): self._loop.call_soon( cb, self, context=context if context else self._context) else: self._callbacks.append(cb) - def remove_done_callback(self, cb: typing.Callable) -> int: + def remove_done_callback(self, cb: Callable) -> int: original_len = len(self._callbacks) self._callbacks = [_cb for _cb in self._callbacks if _cb != cb] return original_len - len(self._callbacks) - def cancel(self, msg: typing.Optional[str] = None) -> bool: + def cancel(self, msg: str | None = None) -> bool: if self.done(): return False self._state = QAsyncioFuture.FutureState.CANCELLED @@ -107,7 +108,7 @@ class QAsyncioFuture(): self._schedule_callbacks() return True - def exception(self) -> typing.Optional[BaseException]: + def exception(self) -> BaseException | None: if self._state == QAsyncioFuture.FutureState.CANCELLED: raise asyncio.CancelledError if self.done(): diff --git a/sources/pyside6/PySide6/QtAsyncio/tasks.py b/sources/pyside6/PySide6/QtAsyncio/tasks.py index 9b6b2c22b..bd7884838 100644 --- a/sources/pyside6/PySide6/QtAsyncio/tasks.py +++ b/sources/pyside6/PySide6/QtAsyncio/tasks.py @@ -5,20 +5,20 @@ from __future__ import annotations from . import events from . import futures +from typing import Any + import asyncio import collections.abc import concurrent.futures import contextvars -import typing class QAsyncioTask(futures.QAsyncioFuture): """ https://docs.python.org/3/library/asyncio-task.html """ - def __init__(self, coro: typing.Union[collections.abc.Generator, collections.abc.Coroutine], *, - loop: typing.Optional["events.QAsyncioEventLoop"] = None, - name: typing.Optional[str] = None, - context: typing.Optional[contextvars.Context] = None) -> None: + def __init__(self, coro: collections.abc.Generator | collections.abc.Coroutine, *, + loop: "events.QAsyncioEventLoop | None" = None, name: str | None = None, + context: contextvars.Context | None = None) -> None: super().__init__(loop=loop, context=context) self._coro = coro # The coroutine for which this task was created. @@ -31,10 +31,10 @@ class QAsyncioTask(futures.QAsyncioFuture): # The task step function executes the coroutine until it finishes, # raises an exception or returns a future. If a future was returned, # the task will await its completion (or exception). - self._future_to_await: typing.Optional[asyncio.Future] = None + self._future_to_await: asyncio.Future | None = None self._cancelled = False - self._cancel_message: typing.Optional[str] = None + self._cancel_message: str | None = None # https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support asyncio._register_task(self) # type: ignore[arg-type] @@ -54,17 +54,16 @@ class QAsyncioTask(futures.QAsyncioFuture): class QtTaskApiMisuseError(Exception): pass - def set_result(self, result: typing.Any) -> None: # type: ignore[override] + def set_result(self, result: Any) -> None: # type: ignore[override] # This function is not inherited from the Future APIs. raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set results") - def set_exception(self, exception: typing.Any) -> None: # type: ignore[override] + def set_exception(self, exception: Any) -> None: # type: ignore[override] # This function is not inherited from the Future APIs. raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set exceptions") def _step(self, - exception_or_future: typing.Union[ - BaseException, futures.QAsyncioFuture, None] = None) -> None: + exception_or_future: BaseException | futures.QAsyncioFuture | None = None) -> None: """ The step function is the heart of a task. It is scheduled in the event loop repeatedly, executing the coroutine "step" by "step" (i.e., @@ -153,7 +152,7 @@ class QAsyncioTask(futures.QAsyncioFuture): # https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support asyncio._unregister_task(self) # type: ignore[arg-type] - def get_stack(self, *, limit=None) -> typing.List[typing.Any]: + def get_stack(self, *, limit=None) -> list[Any]: # TODO raise NotImplementedError("QtTask.get_stack is not implemented") @@ -161,7 +160,7 @@ class QAsyncioTask(futures.QAsyncioFuture): # TODO raise NotImplementedError("QtTask.print_stack is not implemented") - def get_coro(self) -> typing.Union[collections.abc.Generator, collections.abc.Coroutine]: + def get_coro(self) -> collections.abc.Generator | collections.abc.Coroutine: return self._coro def get_name(self) -> str: @@ -170,7 +169,7 @@ class QAsyncioTask(futures.QAsyncioFuture): def set_name(self, value) -> None: self._name = str(value) - def cancel(self, msg: typing.Optional[str] = None) -> bool: + def cancel(self, msg: str | None = None) -> bool: if self.done(): return False self._cancel_message = msg |
