diff options
| author | Adrian Herrmann <adrian.herrmann@qt.io> | 2024-07-03 18:36:20 +0200 |
|---|---|---|
| committer | Adrian Herrmann <adrian.herrmann@qt.io> | 2024-07-16 06:32:02 +0200 |
| commit | 526bc12e42db7c6305bcc28ad8f6b7554597d725 (patch) | |
| tree | a98f909fdf5c893bfe5d2937e0246acdae688dea /sources/pyside6/tests/QtAsyncio/qasyncio_test_uncancel.py | |
| parent | 32c36073e212e7ca17b1f7578d61195aefd9dbca (diff) | |
QtAsyncio: Add cancel count and uncancel
Implement the QAsyncioTask.uncancel() function and the associated cancel
count.
Note to reader: Unlike what the name suggests, the uncancel() function
on its own does not undo a task cancellation. This must be performed by
consuming the CancelledError exception, at which point uncancel() serves
to remove the cancellation state.
Pick-to: 6.7
Task-number: PYSIDE-769
Fixes: PYSIDE-2790
Change-Id: I4e817e1dd3f49179855432d20ed2f043090fd8f1
Reviewed-by: Shyamnath Premnadh <Shyamnath.Premnadh@qt.io>
Reviewed-by: Friedemann Kleint <Friedemann.Kleint@qt.io>
Diffstat (limited to 'sources/pyside6/tests/QtAsyncio/qasyncio_test_uncancel.py')
| -rw-r--r-- | sources/pyside6/tests/QtAsyncio/qasyncio_test_uncancel.py | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_uncancel.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_uncancel.py new file mode 100644 index 000000000..036622845 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_uncancel.py @@ -0,0 +1,61 @@ +# Copyright (C) 2024 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 +from __future__ import annotations + +"""Test cases for QtAsyncio""" + +import unittest +import asyncio + +import PySide6.QtAsyncio as QtAsyncio + + +class QAsyncioTestCaseUncancel(unittest.TestCase): + """ https://superfastpython.com/asyncio-cancel-task-cancellation """ + + async def worker(self, outputs: list[str]): + # Ensure the task always gets done. + while True: + try: + await asyncio.sleep(2) + outputs.append("Task sleep completed normally") + break + except asyncio.CancelledError: + outputs.append("Task is cancelled, ignore and try again") + asyncio.current_task().uncancel() + + async def main(self, outputs: list[str]): + task = asyncio.create_task(self.worker(outputs)) + # Allow the task to run briefly. + await asyncio.sleep(0.5) + task.cancel() + try: + await task + except asyncio.CancelledError: + outputs.append("Task was cancelled") + + cancelling = task.cancelling() + self.assertEqual(cancelling, 0) + outputs.append(f"Task cancelling: {cancelling}") + + cancelled = task.cancelled() + self.assertFalse(cancelled) + outputs.append(f"Task cancelled: {cancelled}") + + done = task.done() + self.assertTrue(done) + outputs.append(f"Task done: {done}") + + def test_uncancel(self): + outputs_expected = [] + outputs_real = [] + + asyncio.run(self.main(outputs_real)) + QtAsyncio.run(self.main(outputs_expected), keep_running=False) + + self.assertIsNotNone(outputs_real) + self.assertEqual(outputs_real, outputs_expected) + + +if __name__ == "__main__": + unittest.main() |
