aboutsummaryrefslogtreecommitdiffstats
path: root/sources/pyside6
diff options
context:
space:
mode:
authorAdrian Herrmann <adrian.herrmann@qt.io>2024-05-30 14:05:58 +0200
committerAdrian Herrmann <adrian.herrmann@qt.io>2024-06-28 13:29:33 +0200
commitc73c656082a18893154525ec2a8474c0d421748e (patch)
tree2f6835abe37686b7c62728164f479f81690367cd /sources/pyside6
parent3e9e1009b58b1a67cfdc4447367d1274e15cdafe (diff)
QtAsyncio: Use modern typing syntax
We can already use the modern typing syntax introduced with Python 3.10 in 3.9 via future statement definitions, even before we raise the minimum Python version to 3.10. Task-number: PYSIDE-769 Task-number: PYSIDE-2786 Change-Id: I560d0c25f3503217f920906a5b26193282b0247b Reviewed-by: Christian Tismer <tismer@stackless.com>
Diffstat (limited to 'sources/pyside6')
-rw-r--r--sources/pyside6/PySide6/QtAsyncio/__init__.py11
-rw-r--r--sources/pyside6/PySide6/QtAsyncio/events.py99
-rw-r--r--sources/pyside6/PySide6/QtAsyncio/futures.py31
-rw-r--r--sources/pyside6/PySide6/QtAsyncio/tasks.py27
4 files changed, 81 insertions, 87 deletions
diff --git a/sources/pyside6/PySide6/QtAsyncio/__init__.py b/sources/pyside6/PySide6/QtAsyncio/__init__.py
index 5e656753e..d284e4f6a 100644
--- a/sources/pyside6/PySide6/QtAsyncio/__init__.py
+++ b/sources/pyside6/PySide6/QtAsyncio/__init__.py
@@ -8,8 +8,9 @@ from .events import (
from .futures import QAsyncioFuture
from .tasks import QAsyncioTask
+from typing import Coroutine, Any
+
import asyncio
-import typing
__all__ = [
"QAsyncioEventLoopPolicy", "QAsyncioEventLoop",
@@ -18,11 +19,9 @@ __all__ = [
]
-def run(coro: typing.Optional[typing.Coroutine] = None,
- keep_running: bool = True,
- quit_qapp: bool = True, *,
- handle_sigint: bool = False,
- debug: typing.Optional[bool] = None) -> typing.Any:
+def run(coro: Coroutine | None = None,
+ keep_running: bool = True, quit_qapp: bool = True, *, handle_sigint: bool = False,
+ debug: bool | None = None) -> Any:
"""
Run the QtAsyncio event loop.
diff --git a/sources/pyside6/PySide6/QtAsyncio/events.py b/sources/pyside6/PySide6/QtAsyncio/events.py
index f104f96ea..57f1a91b1 100644
--- a/sources/pyside6/PySide6/QtAsyncio/events.py
+++ b/sources/pyside6/PySide6/QtAsyncio/events.py
@@ -8,6 +8,8 @@ from PySide6.QtCore import (QCoreApplication, QDateTime, QDeadlineTimer,
from . import futures
from . import tasks
+from typing import Any, Callable
+
import asyncio
import collections.abc
import concurrent.futures
@@ -17,7 +19,6 @@ import os
import signal
import socket
import subprocess
-import typing
import warnings
__all__ = [
@@ -41,7 +42,7 @@ class QAsyncioExecutorWrapper(QObject):
the actual callable for the executor into this new event loop.
"""
- def __init__(self, func: typing.Callable, *args: typing.Tuple) -> None:
+ def __init__(self, func: Callable, *args: tuple) -> None:
super().__init__()
self._loop: QEventLoop
self._func = func
@@ -103,7 +104,7 @@ class QAsyncioEventLoopPolicy(asyncio.AbstractEventLoopPolicy):
# this instance is shut down every time.
self._quit_qapp = quit_qapp
- self._event_loop: typing.Optional[asyncio.AbstractEventLoop] = None
+ self._event_loop: asyncio.AbstractEventLoop | None = None
if handle_sigint:
signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -113,7 +114,7 @@ class QAsyncioEventLoopPolicy(asyncio.AbstractEventLoopPolicy):
self._event_loop = QAsyncioEventLoop(self._application, quit_qapp=self._quit_qapp)
return self._event_loop
- def set_event_loop(self, loop: typing.Optional[asyncio.AbstractEventLoop]) -> None:
+ def set_event_loop(self, loop: asyncio.AbstractEventLoop | None) -> None:
self._event_loop = loop
def new_event_loop(self) -> asyncio.AbstractEventLoop:
@@ -189,7 +190,7 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
self._quit_from_outside = False
# A set of all asynchronous generators that are currently running.
- self._asyncgens: typing.Set[collections.abc.AsyncGenerator] = set()
+ self._asyncgens: set[collections.abc.AsyncGenerator] = set()
# Starting with Python 3.11, this must be an instance of
# ThreadPoolExecutor.
@@ -200,14 +201,14 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
# asynchonrous generator raises an exception when closed, and two, if
# an exception is raised during the execution of a task. Currently, the
# default exception handler just prints the exception to the console.
- self._exception_handler: typing.Optional[typing.Callable] = self.default_exception_handler
+ self._exception_handler: Callable | None = self.default_exception_handler
# The task factory, if set with set_task_factory(). Otherwise, a new
# task is created with the QAsyncioTask constructor.
- self._task_factory: typing.Optional[typing.Callable] = None
+ self._task_factory: Callable | None = None
# The future that is currently being awaited with run_until_complete().
- self._future_to_complete: typing.Optional[futures.QAsyncioFuture] = None
+ self._future_to_complete: futures.QAsyncioFuture | None = None
self._debug = bool(os.getenv("PYTHONASYNCIODEBUG", False))
@@ -226,7 +227,7 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
future.get_loop().stop()
def run_until_complete(self,
- future: futures.QAsyncioFuture) -> typing.Any: # type: ignore[override]
+ future: futures.QAsyncioFuture) -> Any: # type: ignore[override]
if self.is_closed():
raise RuntimeError("Event loop is closed")
if self.is_running():
@@ -320,7 +321,7 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
self._asyncgens.clear()
async def shutdown_default_executor(self, # type: ignore[override]
- timeout: typing.Union[int, float, None] = None) -> None:
+ timeout: int | float | None = None) -> None:
shutdown_successful = False
if timeout is not None:
deadline_timer = QDeadlineTimer(int(timeout * 1000))
@@ -345,51 +346,46 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
# Scheduling callbacks
- def _call_soon_impl(self, callback: typing.Callable, *args: typing.Any,
- context: typing.Optional[contextvars.Context] = None,
- is_threadsafe: typing.Optional[bool] = False) -> asyncio.Handle:
+ def _call_soon_impl(self, callback: Callable, *args: Any,
+ context: contextvars.Context | None = None,
+ is_threadsafe: bool | None = False) -> asyncio.Handle:
return self._call_later_impl(0, callback, *args, context=context,
is_threadsafe=is_threadsafe)
- def call_soon(self, callback: typing.Callable, *args: typing.Any,
- context: typing.Optional[contextvars.Context] = None) -> asyncio.Handle:
+ def call_soon(self, callback: Callable, *args: Any,
+ context: contextvars.Context | None = None) -> asyncio.Handle:
return self._call_soon_impl(callback, *args, context=context, is_threadsafe=False)
- def call_soon_threadsafe(self, callback: typing.Callable, *args: typing.Any,
- context:
- typing.Optional[contextvars.Context] = None) -> asyncio.Handle:
+ def call_soon_threadsafe(self, callback: Callable, *args: Any,
+ context: contextvars.Context | None = None) -> asyncio.Handle:
if self.is_closed():
raise RuntimeError("Event loop is closed")
if context is None:
context = contextvars.copy_context()
return self._call_soon_impl(callback, *args, context=context, is_threadsafe=True)
- def _call_later_impl(self, delay: typing.Union[int, float],
- callback: typing.Callable, *args: typing.Any,
- context: typing.Optional[contextvars.Context] = None,
- is_threadsafe: typing.Optional[bool] = False) -> asyncio.TimerHandle:
+ def _call_later_impl(self, delay: int | float, callback: Callable, *args: Any,
+ context: contextvars.Context | None = None,
+ is_threadsafe: bool | None = False) -> asyncio.TimerHandle:
if not isinstance(delay, (int, float)):
raise TypeError("delay must be an int or float")
return self._call_at_impl(self.time() + delay, callback, *args, context=context,
is_threadsafe=is_threadsafe)
- def call_later(self, delay: typing.Union[int, float],
- callback: typing.Callable, *args: typing.Any,
- context: typing.Optional[contextvars.Context] = None) -> asyncio.TimerHandle:
+ def call_later(self, delay: int | float, callback: Callable, *args: Any,
+ context: contextvars.Context | None = None) -> asyncio.TimerHandle:
return self._call_later_impl(delay, callback, *args, context=context, is_threadsafe=False)
- def _call_at_impl(self, when: typing.Union[int, float],
- callback: typing.Callable, *args: typing.Any,
- context: typing.Optional[contextvars.Context] = None,
- is_threadsafe: typing.Optional[bool] = False) -> asyncio.TimerHandle:
+ def _call_at_impl(self, when: int | float, callback: Callable, *args: Any,
+ context: contextvars.Context | None = None,
+ is_threadsafe: bool | None = False) -> asyncio.TimerHandle:
""" All call_at() and call_later() methods map to this method. """
if not isinstance(when, (int, float)):
raise TypeError("when must be an int or float")
return QAsyncioTimerHandle(when, callback, args, self, context, is_threadsafe=is_threadsafe)
- def call_at(self, when: typing.Union[int, float],
- callback: typing.Callable, *args: typing.Any,
- context: typing.Optional[contextvars.Context] = None) -> asyncio.TimerHandle:
+ def call_at(self, when: int | float, callback: Callable, *args: Any,
+ context: contextvars.Context | None = None) -> asyncio.TimerHandle:
return self._call_at_impl(when, callback, *args, context=context, is_threadsafe=False)
def time(self) -> float:
@@ -401,9 +397,9 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
return futures.QAsyncioFuture(loop=self)
def create_task(self, # type: ignore[override]
- coro: typing.Union[collections.abc.Generator, collections.abc.Coroutine],
- *, name: typing.Optional[str] = None,
- context: typing.Optional[contextvars.Context] = None) -> tasks.QAsyncioTask:
+ coro: collections.abc.Generator | collections.abc.Coroutine,
+ *, name: str | None = None,
+ context: contextvars.Context | None = None) -> tasks.QAsyncioTask:
if self._task_factory is None:
task = tasks.QAsyncioTask(coro, loop=self, name=name, context=context)
else:
@@ -412,12 +408,12 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
return task
- def set_task_factory(self, factory: typing.Optional[typing.Callable]) -> None:
+ def set_task_factory(self, factory: Callable | None) -> None:
if factory is not None and not callable(factory):
raise TypeError("The task factory must be a callable or None")
self._task_factory = factory
- def get_task_factory(self) -> typing.Optional[typing.Callable]:
+ def get_task_factory(self) -> Callable | None:
return self._task_factory
# Opening network connections
@@ -561,8 +557,8 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
# Executing code in thread or process pools
def run_in_executor(self,
- executor: typing.Optional[concurrent.futures.ThreadPoolExecutor],
- func: typing.Callable, *args: typing.Tuple) -> asyncio.futures.Future:
+ executor: concurrent.futures.ThreadPoolExecutor | None,
+ func: Callable, *args: tuple) -> asyncio.futures.Future:
if self.is_closed():
raise RuntimeError("Event loop is closed")
if executor is None:
@@ -580,28 +576,27 @@ class QAsyncioEventLoop(asyncio.BaseEventLoop, QObject):
)
def set_default_executor(self,
- executor: typing.Optional[
- concurrent.futures.ThreadPoolExecutor]) -> None:
+ executor: concurrent.futures.ThreadPoolExecutor | None) -> None:
if not isinstance(executor, concurrent.futures.ThreadPoolExecutor):
raise TypeError("The executor must be a ThreadPoolExecutor")
self._default_executor = executor
# Error Handling API
- def set_exception_handler(self, handler: typing.Optional[typing.Callable]) -> None:
+ def set_exception_handler(self, handler: Callable | None) -> None:
if handler is not None and not callable(handler):
raise TypeError("The handler must be a callable or None")
self._exception_handler = handler
- def get_exception_handler(self) -> typing.Optional[typing.Callable]:
+ def get_exception_handler(self) -> Callable | None:
return self._exception_handler
- def default_exception_handler(self, context: typing.Dict[str, typing.Any]) -> None:
+ def default_exception_handler(self, context: dict[str, Any]) -> None:
# TODO
if context["message"]:
print(context["message"])
- def call_exception_handler(self, context: typing.Dict[str, typing.Any]) -> None:
+ def call_exception_handler(self, context: dict[str, Any]) -> None:
if self._exception_handler is not None:
self._exception_handler(context)
@@ -644,9 +639,9 @@ class QAsyncioHandle():
CANCELLED = enum.auto()
DONE = enum.auto()
- def __init__(self, callback: typing.Callable, args: typing.Tuple,
- loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context],
- is_threadsafe: typing.Optional[bool] = False) -> None:
+ def __init__(self, callback: Callable, args: tuple,
+ loop: QAsyncioEventLoop, context: contextvars.Context | None,
+ is_threadsafe: bool | None = False) -> None:
self._callback = callback
self._args = args
self._loop = loop
@@ -661,7 +656,7 @@ class QAsyncioHandle():
def _start(self) -> None:
self._schedule_event(self._timeout, lambda: self._cb())
- def _schedule_event(self, timeout: int, func: typing.Callable) -> None:
+ def _schedule_event(self, timeout: int, func: Callable) -> None:
# Do not schedule events from asyncio when the app is quit from outside
# the event loop, as this would cause events to be enqueued after the
# event loop was destroyed.
@@ -701,9 +696,9 @@ class QAsyncioHandle():
class QAsyncioTimerHandle(QAsyncioHandle, asyncio.TimerHandle):
- def __init__(self, when: float, callback: typing.Callable, args: typing.Tuple,
- loop: QAsyncioEventLoop, context: typing.Optional[contextvars.Context],
- is_threadsafe: typing.Optional[bool] = False) -> None:
+ def __init__(self, when: float, callback: Callable, args: tuple,
+ loop: QAsyncioEventLoop, context: contextvars.Context | None,
+ is_threadsafe: bool | None = False) -> None:
QAsyncioHandle.__init__(self, callback, args, loop, context, is_threadsafe)
self._when = when
diff --git a/sources/pyside6/PySide6/QtAsyncio/futures.py b/sources/pyside6/PySide6/QtAsyncio/futures.py
index 29d86ad3c..6b4415490 100644
--- a/sources/pyside6/PySide6/QtAsyncio/futures.py
+++ b/sources/pyside6/PySide6/QtAsyncio/futures.py
@@ -4,10 +4,11 @@ from __future__ import annotations
from . import events
+from typing import Any, Callable
+
import asyncio
import contextvars
import enum
-import typing
class QAsyncioFuture():
@@ -24,8 +25,8 @@ class QAsyncioFuture():
DONE_WITH_RESULT = enum.auto()
DONE_WITH_EXCEPTION = enum.auto()
- def __init__(self, *, loop: typing.Optional["events.QAsyncioEventLoop"] = None,
- context: typing.Optional[contextvars.Context] = None) -> None:
+ def __init__(self, *, loop: "events.QAsyncioEventLoop | None" = None,
+ context: contextvars.Context | None = None) -> None:
self._loop: "events.QAsyncioEventLoop"
if loop is None:
self._loop = asyncio.events.get_event_loop() # type: ignore[assignment]
@@ -34,13 +35,13 @@ class QAsyncioFuture():
self._context = context
self._state = QAsyncioFuture.FutureState.PENDING
- self._result: typing.Any = None
- self._exception: typing.Optional[BaseException] = None
+ self._result: Any = None
+ self._exception: BaseException | None = None
- self._cancel_message: typing.Optional[str] = None
+ self._cancel_message: str | None = None
# List of callbacks that are called when the future is done.
- self._callbacks: typing.List[typing.Callable] = list()
+ self._callbacks: list[Callable] = list()
def __await__(self):
if not self.done():
@@ -52,13 +53,13 @@ class QAsyncioFuture():
__iter__ = __await__
- def _schedule_callbacks(self, context: typing.Optional[contextvars.Context] = None):
+ def _schedule_callbacks(self, context: contextvars.Context | None = None):
""" A future can optionally have callbacks that are called when the future is done. """
for cb in self._callbacks:
self._loop.call_soon(
cb, self, context=context if context else self._context)
- def result(self) -> typing.Union[typing.Any, Exception]:
+ def result(self) -> Any | Exception:
if self._state == QAsyncioFuture.FutureState.DONE_WITH_RESULT:
return self._result
if self._state == QAsyncioFuture.FutureState.DONE_WITH_EXCEPTION and self._exception:
@@ -70,7 +71,7 @@ class QAsyncioFuture():
raise asyncio.CancelledError
raise asyncio.InvalidStateError
- def set_result(self, result: typing.Any) -> None:
+ def set_result(self, result: Any) -> None:
self._result = result
self._state = QAsyncioFuture.FutureState.DONE_WITH_RESULT
self._schedule_callbacks()
@@ -86,20 +87,20 @@ class QAsyncioFuture():
def cancelled(self) -> bool:
return self._state == QAsyncioFuture.FutureState.CANCELLED
- def add_done_callback(self, cb: typing.Callable, *,
- context: typing.Optional[contextvars.Context] = None) -> None:
+ def add_done_callback(self, cb: Callable, *,
+ context: contextvars.Context | None = None) -> None:
if self.done():
self._loop.call_soon(
cb, self, context=context if context else self._context)
else:
self._callbacks.append(cb)
- def remove_done_callback(self, cb: typing.Callable) -> int:
+ def remove_done_callback(self, cb: Callable) -> int:
original_len = len(self._callbacks)
self._callbacks = [_cb for _cb in self._callbacks if _cb != cb]
return original_len - len(self._callbacks)
- def cancel(self, msg: typing.Optional[str] = None) -> bool:
+ def cancel(self, msg: str | None = None) -> bool:
if self.done():
return False
self._state = QAsyncioFuture.FutureState.CANCELLED
@@ -107,7 +108,7 @@ class QAsyncioFuture():
self._schedule_callbacks()
return True
- def exception(self) -> typing.Optional[BaseException]:
+ def exception(self) -> BaseException | None:
if self._state == QAsyncioFuture.FutureState.CANCELLED:
raise asyncio.CancelledError
if self.done():
diff --git a/sources/pyside6/PySide6/QtAsyncio/tasks.py b/sources/pyside6/PySide6/QtAsyncio/tasks.py
index 9b6b2c22b..bd7884838 100644
--- a/sources/pyside6/PySide6/QtAsyncio/tasks.py
+++ b/sources/pyside6/PySide6/QtAsyncio/tasks.py
@@ -5,20 +5,20 @@ from __future__ import annotations
from . import events
from . import futures
+from typing import Any
+
import asyncio
import collections.abc
import concurrent.futures
import contextvars
-import typing
class QAsyncioTask(futures.QAsyncioFuture):
""" https://docs.python.org/3/library/asyncio-task.html """
- def __init__(self, coro: typing.Union[collections.abc.Generator, collections.abc.Coroutine], *,
- loop: typing.Optional["events.QAsyncioEventLoop"] = None,
- name: typing.Optional[str] = None,
- context: typing.Optional[contextvars.Context] = None) -> None:
+ def __init__(self, coro: collections.abc.Generator | collections.abc.Coroutine, *,
+ loop: "events.QAsyncioEventLoop | None" = None, name: str | None = None,
+ context: contextvars.Context | None = None) -> None:
super().__init__(loop=loop, context=context)
self._coro = coro # The coroutine for which this task was created.
@@ -31,10 +31,10 @@ class QAsyncioTask(futures.QAsyncioFuture):
# The task step function executes the coroutine until it finishes,
# raises an exception or returns a future. If a future was returned,
# the task will await its completion (or exception).
- self._future_to_await: typing.Optional[asyncio.Future] = None
+ self._future_to_await: asyncio.Future | None = None
self._cancelled = False
- self._cancel_message: typing.Optional[str] = None
+ self._cancel_message: str | None = None
# https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support
asyncio._register_task(self) # type: ignore[arg-type]
@@ -54,17 +54,16 @@ class QAsyncioTask(futures.QAsyncioFuture):
class QtTaskApiMisuseError(Exception):
pass
- def set_result(self, result: typing.Any) -> None: # type: ignore[override]
+ def set_result(self, result: Any) -> None: # type: ignore[override]
# This function is not inherited from the Future APIs.
raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set results")
- def set_exception(self, exception: typing.Any) -> None: # type: ignore[override]
+ def set_exception(self, exception: Any) -> None: # type: ignore[override]
# This function is not inherited from the Future APIs.
raise QAsyncioTask.QtTaskApiMisuseError("Tasks cannot set exceptions")
def _step(self,
- exception_or_future: typing.Union[
- BaseException, futures.QAsyncioFuture, None] = None) -> None:
+ exception_or_future: BaseException | futures.QAsyncioFuture | None = None) -> None:
"""
The step function is the heart of a task. It is scheduled in the event
loop repeatedly, executing the coroutine "step" by "step" (i.e.,
@@ -153,7 +152,7 @@ class QAsyncioTask(futures.QAsyncioFuture):
# https://docs.python.org/3/library/asyncio-extending.html#task-lifetime-support
asyncio._unregister_task(self) # type: ignore[arg-type]
- def get_stack(self, *, limit=None) -> typing.List[typing.Any]:
+ def get_stack(self, *, limit=None) -> list[Any]:
# TODO
raise NotImplementedError("QtTask.get_stack is not implemented")
@@ -161,7 +160,7 @@ class QAsyncioTask(futures.QAsyncioFuture):
# TODO
raise NotImplementedError("QtTask.print_stack is not implemented")
- def get_coro(self) -> typing.Union[collections.abc.Generator, collections.abc.Coroutine]:
+ def get_coro(self) -> collections.abc.Generator | collections.abc.Coroutine:
return self._coro
def get_name(self) -> str:
@@ -170,7 +169,7 @@ class QAsyncioTask(futures.QAsyncioFuture):
def set_name(self, value) -> None:
self._name = str(value)
- def cancel(self, msg: typing.Optional[str] = None) -> bool:
+ def cancel(self, msg: str | None = None) -> bool:
if self.done():
return False
self._cancel_message = msg