If you are using python3.5 or newer, you could use async/await statements.
import asyncio
import multiprocessing
import random
import time
from concurrent.futures import ProcessPoolExecutor
count_received = 0
tests_count = 16
async def AWS_KMS_call(i, text, action='encrypt'):
print(i, multiprocessing.current_process().name, action)
await asyncio.sleep(random.random()) # emulate some work
return text[::-1]
async def encrypt(i, text):
encrypted = await AWS_KMS_call(i, text, action='encrypt')
print(i, multiprocessing.current_process().name, encrypted, 'encrypted')
return encrypted
async def decrypt(i, text):
encrypted = await encrypt(i, text)
decrypted = await AWS_KMS_call(i, encrypted, action='decrypt')
print(i, multiprocessing.current_process().name, encrypted, 'decrypted')
return decrypted
def pool_func(i, text):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
decrypted = loop.run_until_complete(decrypt(i, text))
print(i, multiprocessing.current_process().name, decrypted, 'pool_func exit')
return i, decrypted
def got_result(future):
global count_received
global tests_count
count_received += 1
i, text = future.result()
print(i, multiprocessing.current_process().name, text, 'done')
if count_received == tests_count:
loop.stop()
if __name__ == '__main__':
text = 'some_data_to_encrypt'
executor = ProcessPoolExecutor(16)
loop = asyncio.get_event_loop()
for i in range(tests_count):
task = asyncio.ensure_future(loop.run_in_executor(executor, pool_func, i, text))
task.add_done_callback(got_result)
try:
loop.run_forever()
finally:
loop.close()
I added several prints for clarity and output looks like:
0 Process-1 encrypt
1 Process-3 encrypt
2 Process-5 encrypt
4 Process-2 encrypt
5 Process-4 encrypt
6 Process-6 encrypt
7 Process-9 encrypt
8 Process-7 encrypt
9 Process-10 encrypt
10 Process-8 encrypt
3 Process-15 encrypt
11 Process-11 encrypt
12 Process-12 encrypt
13 Process-14 encrypt
15 Process-13 encrypt
14 Process-16 encrypt
1 Process-3 tpyrcne_ot_atad_emos encrypted
1 Process-3 decrypt
6 Process-6 tpyrcne_ot_atad_emos encrypted
6 Process-6 decrypt
15 Process-13 tpyrcne_ot_atad_emos encrypted
15 Process-13 decrypt
5 Process-4 tpyrcne_ot_atad_emos encrypted
5 Process-4 decrypt
3 Process-15 tpyrcne_ot_atad_emos encrypted
3 Process-15 decrypt
9 Process-10 tpyrcne_ot_atad_emos encrypted
9 Process-10 decrypt
1 Process-3 tpyrcne_ot_atad_emos decrypted
1 Process-3 some_data_to_encrypt pool_func exit
1 MainProcess some_data_to_encrypt done
4 Process-2 tpyrcne_ot_atad_emos encrypted
4 Process-2 decrypt
11 Process-11 tpyrcne_ot_atad_emos encrypted
11 Process-11 decrypt
10 Process-8 tpyrcne_ot_atad_emos encrypted
10 Process-8 decrypt
10 Process-8 tpyrcne_ot_atad_emos decrypted
10 Process-8 some_data_to_encrypt pool_func exit
10 MainProcess some_data_to_encrypt done
6 Process-6 tpyrcne_ot_atad_emos decrypted
6 Process-6 some_data_to_encrypt pool_func exit
6 MainProcess some_data_to_encrypt done
8 Process-7 tpyrcne_ot_atad_emos encrypted
8 Process-7 decrypt
12 Process-12 tpyrcne_ot_atad_emos encrypted
12 Process-12 decrypt
2 Process-5 tpyrcne_ot_atad_emos encrypted
2 Process-5 decrypt
15 Process-13 tpyrcne_ot_atad_emos decrypted
15 Process-13 some_data_to_encrypt pool_func exit
15 MainProcess some_data_to_encrypt done
11 Process-11 tpyrcne_ot_atad_emos decrypted
11 Process-11 some_data_to_encrypt pool_func exit
11 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos encrypted
13 Process-14 decrypt
4 Process-2 tpyrcne_ot_atad_emos decrypted
4 Process-2 some_data_to_encrypt pool_func exit
4 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos encrypted
7 Process-9 decrypt
0 Process-1 tpyrcne_ot_atad_emos encrypted
0 Process-1 decrypt
14 Process-16 tpyrcne_ot_atad_emos encrypted
14 Process-16 decrypt
8 Process-7 tpyrcne_ot_atad_emos decrypted
8 Process-7 some_data_to_encrypt pool_func exit
8 MainProcess some_data_to_encrypt done
5 Process-4 tpyrcne_ot_atad_emos decrypted
5 Process-4 some_data_to_encrypt pool_func exit
5 MainProcess some_data_to_encrypt done
3 Process-15 tpyrcne_ot_atad_emos decrypted
3 Process-15 some_data_to_encrypt pool_func exit
3 MainProcess some_data_to_encrypt done
9 Process-10 tpyrcne_ot_atad_emos decrypted
9 Process-10 some_data_to_encrypt pool_func exit
9 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos decrypted
13 Process-14 some_data_to_encrypt pool_func exit
13 MainProcess some_data_to_encrypt done
2 Process-5 tpyrcne_ot_atad_emos decrypted
2 Process-5 some_data_to_encrypt pool_func exit
2 MainProcess some_data_to_encrypt done
0 Process-1 tpyrcne_ot_atad_emos decrypted
0 Process-1 some_data_to_encrypt pool_func exit
0 MainProcess some_data_to_encrypt done
12 Process-12 tpyrcne_ot_atad_emos decrypted
12 Process-12 some_data_to_encrypt pool_func exit
12 MainProcess some_data_to_encrypt done
14 Process-16 tpyrcne_ot_atad_emos decrypted
14 Process-16 some_data_to_encrypt pool_func exit
14 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos decrypted
7 Process-9 some_data_to_encrypt pool_func exit
7 MainProcess some_data_to_encrypt done
Also, i suggest you to look into celery library. Maybe it will fits your requirements better
concurrentvsparallel. Parallel means that everything is happening in different OS level threads or processes at the exact same time. Concurrent means that anytime something blocks, another task is run asynchronously. In python, you're not going to be accessing OS level threads because the GIL is going to prevent you from doing that. Because your code is going to be CPU bound (encryption/decryption requires computation) you should take a parallel approach. If your code was IO bound (network IO, disk IO, etc) it would be better to use asyncio.