I have a Python method with 4 threads. I have to process some data in these 4 threads and wait till all of the threads complete the processing and then proceed ahead.
The issue is, sometimes the script works as expected: it process the threads, then proceeds with the code after thread.join(). However, the method randomly terminates at thread.join(). I'm stuck at finding the exact issue and fixing it.
Here's my code
def check_gaze(self) -> List[List[int]]:
folder_path = self._gaze_preprocess()
.
.
.
def process_files(files):
output = []
for _file in files:
try:
...
except:
print("error")
return output
r1, r2, r3, r4 = [], [], [], []
t1 = threading.Thread(target=lambda: r1.extend(process_files(file_groups[0])))
t2 = threading.Thread(target=lambda: r2.extend(process_files(file_groups[1])))
t3 = threading.Thread(target=lambda: r3.extend(process_files(file_groups[2])))
t4 = threading.Thread(target=lambda: r4.extend(process_files(file_groups[3])))
threads = [t1, t2, t3, t4]
print("before start")
for thread in threads:
thread.start()
print("after start")
print("before join")
for t in threads:
t.join()
print("after join")
# Merge the results from all three threads and save to a CSV file
output = r1 + r2 + r3 + r4
data = self._gaze_postprocess()
return data
obj = Gaze()
print("pre call")
gaze_output = obj.check_gaze()
print("post call")
Here's the output for the issue:
pre call
before start
after start
before join
Here's the terminal output (it lacks some debug print statements but I have checked that program does not go beyond thread.join()
Edit: Using Louis Lac's solution from their answer posted below, I'm still facing the same issue. Based on my debugging so far, I can say that the issue is somewhere in the gaze detection package I'm using, because if I use dummy code in it's place I don't get the issue anymore!
code:
from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game
def process_files(files: list) -> list:
output = []
for file_ in files:
try:
frame_number = Path(file_).name.split(".")[0]
gaze = eye_game.get_gaze_direction(file_)
output.append([frame_number, gaze, gaze])
except Exception as e:
print(e)
output.append([frame_number, 'No face', 'No face'])
print(f'Output is : {output}')
return output
def check_gaze(file_groups: list[list]) -> list:
results = []
with ThreadPoolExecutor(max_workers=4) as pool:
outputs = pool.map(process_files, file_groups)
for result in outputs:
results.append(result)
return results
def main():
start = time.time()
jpg_files = list(Path('test_9/img').glob("*.jpg"))
jpg_files = [str(f.resolve()) for f in jpg_files]
file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
results = check_gaze(file_groups)
print(results)
end = time.time()
print(f'The script took {(end - start) /60} minutes')
if __name__ == "__main__":
main()
Solution
I fixed my code looking at Lie Ryan's answer below and used a ProcessPool instead of a ThreadPool. The reason for using multiprocessing here instead of multithreading is that the issue with threads is due to the eye_game package. While the code is open source, I haven't had the time to go through it and see where exactly the issues arise.
Here's my solution.
from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game
def process_files(files: list) -> list:
output = []
for file_ in files:
try:
frame_number = Path(file_).name.split(".")[0]
gaze = eye_game.get_gaze_direction(file_)
output.append([frame_number, gaze, gaze])
except Exception as e:
print(e)
output.append([frame_number, 'No face', 'No face'])
return output
def check_gaze(file_groups: list[list]) -> list:
results = []
with ThreadPoolExecutor(max_workers=4) as pool:
outputs = pool.map(process_files, file_groups)
for result in outputs:
results.append(result)
return results
def _debug_main():
start = time.time()
jpg_files = list(Path('path-to-directory').glob("*.jpg"))
jpg_files = [str(f.resolve()) for f in jpg_files]
file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
results = check_gaze(file_groups)
print(results)
end = time.time()
print(f'The script took {(end - start) /60} minutes')
if __name__ == "__main__":
_debug_main()


Gazeinstance. It presumably then enteredobj.check_gaze(), but you've showed no proof thatcheck_gazeever created any threads or started any threads or joined any threads. Why do you think the program terminated in one of thet.join()calls?process_files(...)calls is hanging.