diff options
| author | Adrian Herrmann <adrian.herrmann@qt.io> | 2023-09-12 01:19:33 +0200 |
|---|---|---|
| committer | Adrian Herrmann <adrian.herrmann@qt.io> | 2023-09-12 11:55:32 +0200 |
| commit | a53379153abe1fc3b1ac5625714c6002510c6d73 (patch) | |
| tree | bea6c72fb2ca4d6cea770ddc5fcf88a051c7a785 /sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py | |
| parent | 971c5944415cd557112ff046c4b0c41e8b0d5b05 (diff) | |
QtAsyncio: Add queues test
Add a test for the asyncio queue for consumer/producer scenarios.
Additionally, fix a few bugs exposed by this test through the increased
code coverage.
Task-number: PYSIDE-769
Change-Id: I18e3be6d059b758868a7598b58704db216bcdcc8
Reviewed-by: Friedemann Kleint <Friedemann.Kleint@qt.io>
Diffstat (limited to 'sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py')
| -rw-r--r-- | sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py | 68 |
1 files changed, 68 insertions, 0 deletions
diff --git a/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py b/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py new file mode 100644 index 000000000..38827b0f7 --- /dev/null +++ b/sources/pyside6/tests/QtAsyncio/qasyncio_test_queues.py @@ -0,0 +1,68 @@ +# Copyright (C) 2023 The Qt Company Ltd. +# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR GPL-3.0-only WITH Qt-GPL-exception-1.0 + +'''Test cases for QtAsyncio''' + +import unittest +import asyncio +import random +import time + +from PySide6.QtAsyncio import QAsyncioEventLoopPolicy + + +class QAsyncioTestCaseQueues(unittest.TestCase): + + async def produce(self, output, queue): + for _ in range(random.randint(0, 3)): + await asyncio.sleep(random.randint(0, 2)) + await queue.put(self.i) + output += f"{self.i} added to queue\n" + self.i += 1 + + async def consume(self, output, queue): + while True: + await asyncio.sleep(random.randint(0, 2)) + i = await queue.get() + output += f"{i} pulled from queue\n" + queue.task_done() + + async def main(self, output1, output2, num_producers, num_consumers): + self.i = 0 + queue = asyncio.Queue() # type: asyncio.Queue + producers = [asyncio.create_task(self.produce(output1, queue)) for _ in range(num_producers)] + consumers = [asyncio.create_task(self.consume(output2, queue)) for _ in range(num_consumers)] + await asyncio.gather(*producers) + await queue.join() + for consumer in consumers: + consumer.cancel() + + def test_queues(self): + args = [(2, 3), (2, 1)] + for arg in args: + outputs_expected1 = [] + outputs_expected2 = [] + outputs_real1 = [] + outputs_real2 = [] + + # Run the code without QAsyncioEventLoopPolicy + random.seed(17) + start = time.perf_counter() + asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) + asyncio.run(self.main(outputs_expected1, outputs_expected2, *arg)) + end_expected = time.perf_counter() - start + + # Run the code with QAsyncioEventLoopPolicy and QtEventLoop + random.seed(17) + start = time.perf_counter() + asyncio.set_event_loop_policy(QAsyncioEventLoopPolicy()) + asyncio.run(self.main(outputs_real1, outputs_real2, *arg)) + end_real = time.perf_counter() - start + + self.assertEqual(outputs_expected1, outputs_real1) + self.assertEqual(outputs_expected2, outputs_real2) + self.assertAlmostEqual(end_expected, end_real, delta=1) + + +if __name__ == "__main__": + unittest.main() |
