blob: 178701dc25bc3de09b0b3ff708dcfde0f747aea1 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# Copyright (C) 2024 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
from __future__ import annotations
'''Test cases for QtAsyncio'''
import os
import sys
import unittest
from pathlib import Path
sys.path.append(os.fspath(Path(__file__).resolve().parents[1]))
from init_paths import init_test_paths
init_test_paths(False)
import asyncio
import PySide6.QtAsyncio as QtAsyncio
@unittest.skipIf(sys.version_info < (3, 11), "Requires ExceptionGroup")
class QAsyncioTestCaseCancelTaskGroup(unittest.TestCase):
"""
PYSIDE-2644: If a task was cancelled, then a new future created from this
task should be cancelled as well. Otherwise, in some scenarios like a loop
inside the task and with bad timing, if the new future is not cancelled,
the task would continue running in this loop despite having been cancelled.
This bad timing can occur especially if the first future finishes very
quickly.
"""
def setUp(self) -> None:
super().setUp()
# We only reach the end of the loop if the task is not cancelled.
self.loop_end_reached = False
async def raise_error(self):
raise RuntimeError
async def loop_short(self):
self._loop_end_reached = False
for _ in range(1000):
await asyncio.sleep(1e-3)
self._loop_end_reached = True
async def loop_shorter(self):
self._loop_end_reached = False
for _ in range(1000):
await asyncio.sleep(1e-4)
self._loop_end_reached = True
async def loop_the_shortest(self):
self._loop_end_reached = False
for _ in range(1000):
await asyncio.to_thread(lambda: None)
self._loop_end_reached = True
async def main(self, coro):
async with asyncio.TaskGroup() as tg:
tg.create_task(coro())
tg.create_task(self.raise_error())
def test_cancel_taskgroup(self):
coros = [self.loop_short, self.loop_shorter, self.loop_the_shortest]
for coro in coros:
try:
QtAsyncio.run(self.main(coro), keep_running=False)
except ExceptionGroup as e: # noqa: F821
self.assertEqual(len(e.exceptions), 1)
self.assertIsInstance(e.exceptions[0], RuntimeError)
self.assertFalse(self._loop_end_reached)
if __name__ == '__main__':
unittest.main()
|