blob: fd75a1fc711543071fd9a4f200c7e8727e14a458 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# Copyright (C) 2024 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0
from __future__ import annotations
"""Test cases for QtAsyncio"""
import os
import sys
import unittest
from pathlib import Path
sys.path.append(os.fspath(Path(__file__).resolve().parents[1]))
from init_paths import init_test_paths
init_test_paths(False)
import asyncio
import PySide6.QtAsyncio as QtAsyncio
@unittest.skipIf(sys.version_info < (3, 11), "requires Task.uncancel() (Python 3.11+)")
class QAsyncioTestCaseUncancel(unittest.TestCase):
""" https://superfastpython.com/asyncio-cancel-task-cancellation """
async def worker(self, outputs: list[str]):
# Ensure the task always gets done.
while True:
try:
await asyncio.sleep(2)
outputs.append("Task sleep completed normally")
break
except asyncio.CancelledError:
outputs.append("Task is cancelled, ignore and try again")
asyncio.current_task().uncancel()
async def main(self, outputs: list[str]):
task = asyncio.create_task(self.worker(outputs))
# Allow the task to run briefly.
await asyncio.sleep(0.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
outputs.append("Task was cancelled")
cancelling = task.cancelling()
self.assertEqual(cancelling, 0)
outputs.append(f"Task cancelling: {cancelling}")
cancelled = task.cancelled()
self.assertFalse(cancelled)
outputs.append(f"Task cancelled: {cancelled}")
done = task.done()
self.assertTrue(done)
outputs.append(f"Task done: {done}")
def test_uncancel(self):
outputs_expected = []
outputs_real = []
asyncio.run(self.main(outputs_real))
QtAsyncio.run(self.main(outputs_expected), keep_running=False)
self.assertIsNotNone(outputs_real)
self.assertEqual(outputs_real, outputs_expected)
if __name__ == "__main__":
unittest.main()
|