3

I have a Python method with 4 threads. I have to process some data in these 4 threads and wait till all of the threads complete the processing and then proceed ahead.

The issue is, sometimes the script works as expected: it process the threads, then proceeds with the code after thread.join(). However, the method randomly terminates at thread.join(). I'm stuck at finding the exact issue and fixing it.

Here's my code

def check_gaze(self) -> List[List[int]]:
    folder_path = self._gaze_preprocess()

    .
    .
    .
    
    def process_files(files):
        output = []

        for _file in files:
            try:
                ...
            except:
                print("error")

        return output

    r1, r2, r3, r4 = [], [], [], []

    t1 = threading.Thread(target=lambda: r1.extend(process_files(file_groups[0])))
    t2 = threading.Thread(target=lambda: r2.extend(process_files(file_groups[1])))
    t3 = threading.Thread(target=lambda: r3.extend(process_files(file_groups[2])))
    t4 = threading.Thread(target=lambda: r4.extend(process_files(file_groups[3])))

    threads = [t1, t2, t3, t4]
    
    print("before start")

    for thread in threads:
        thread.start()
    
    print("after start")

    print("before join")

    for t in threads:
        t.join()
    
    print("after join")

    # Merge the results from all three threads and save to a CSV file
    output = r1 + r2 + r3 + r4

    data = self._gaze_postprocess()

    return data


obj = Gaze()

print("pre call")

gaze_output = obj.check_gaze()

print("post call")

Here's the output for the issue:

pre call
before start
after start
before join

Here's the terminal output (it lacks some debug print statements but I have checked that program does not go beyond thread.join()

console output


Edit: Using Louis Lac's solution from their answer posted below, I'm still facing the same issue. Based on my debugging so far, I can say that the issue is somewhere in the gaze detection package I'm using, because if I use dummy code in it's place I don't get the issue anymore!

code:

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            output.append([frame_number, gaze, gaze])
            
        except Exception as e:
            print(e)
            output.append([frame_number, 'No face', 'No face'])

    print(f'Output is : {output}')
    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    start = time.time()
    
    jpg_files = list(Path('test_9/img').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    
    
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    end = time.time()
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    main()

Results output ss


Solution

I fixed my code looking at Lie Ryan's answer below and used a ProcessPool instead of a ThreadPool. The reason for using multiprocessing here instead of multithreading is that the issue with threads is due to the eye_game package. While the code is open source, I haven't had the time to go through it and see where exactly the issues arise.

Here's my solution.

from concurrent.futures import ThreadPoolExecutor
from utils import Utils
from pathlib import Path
import time
import eye_game

def process_files(files: list) -> list:
    output = []

    for file_ in files:
        try:
            frame_number = Path(file_).name.split(".")[0]
            gaze = eye_game.get_gaze_direction(file_)
            
            output.append([frame_number, gaze, gaze])
            
        except Exception as e:
            print(e)
            
            output.append([frame_number, 'No face', 'No face'])

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor(max_workers=4) as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
            
    return results


def _debug_main():
    start = time.time()
    
    jpg_files = list(Path('path-to-directory').glob("*.jpg"))
    jpg_files = [str(f.resolve()) for f in jpg_files]
    file_groups = Utils().gaze_util_round_robin_sublists(jpg_files)
    
    results = check_gaze(file_groups)
    
    print(results)
    
    end = time.time()
    
    print(f'The script took {(end - start) /60} minutes')

if __name__ == "__main__":
    _debug_main()
10
  • Re, "Here's the output..." That only proves that the main thread created a Gaze instance. It presumably then entered obj.check_gaze(), but you've showed no proof that check_gaze ever created any threads or started any threads or joined any threads. Why do you think the program terminated in one of the t.join() calls? Commented Jun 15, 2023 at 18:55
  • @SolomonSlow my bad, I have added some debug print statements in the code above. I have tested the complete method line by line and found out that program terminates at t.join(), it does not go beyond that. Commented Jun 15, 2023 at 19:08
  • Did you maybe mean to say that the program hangs? "Hanging" means that the program just stops doing anything, and it never terminates. Is that what's happening? If so, then it almost certainly means that one of the process_files(...) calls is hanging. Commented Jun 15, 2023 at 19:15
  • @SolomonSlow I have shared a snap of success and fail both scenarios here: imgur.com/a/DJTVIgt The program does not hang per se, it terminates as we can see the terminal (I mean I can immediately use the terminal again it isnt stuck/ hanging) Commented Jun 15, 2023 at 19:21
  • 1
    Maybe I don't understand your screen shot, but it looks as if you started the program twice. The first time, it printed "pre call", then 18 more lines ending with, "post call." The second time all I see is "pre call." Was there more that I'm not seeing? [P.S., adding the actual text of the console output to your question is preferred over posting links to pictures of text. People here don't like links (links go stale), and they don't like pictures (can't be searched.)] Commented Jun 15, 2023 at 19:27

2 Answers 2

3

Few recommendations:

  • use threads as a last resort. In your case you could use a ThreadPoolExecutor instead,
  • do not catch all possible exceptions (except:), usually you only want to catch non-exit exceptions (except Exception:),
  • handle errors inside the catch block instead of just printing it; you can either raise/re-raise an error or return None or an empty list if this is relevant,
  • avoid mutating shared state from a different threads, this can cause to bugs if not handled correctly (for instance if two threads mutate the same shared state at the same time),
  • wrap your main entry point in a if __name__ == "__main__": block as it can cause issues with multiprocessing if not present.

Here is an example which respects these guidelines:

from concurrent.futures import ThreadPoolExecutor
from time import sleep


def process_files(files: list) -> list:
    output = []

    for file_ in files:
        sleep(0.5)
        try:
            output.append(file_)
        except Exception as e:
            print(e)
            raise ValueError("un-expected value received")

    return output


def check_gaze(file_groups: list[list]) -> list:
    results = []

    with ThreadPoolExecutor() as pool:
        outputs = pool.map(process_files, file_groups)
        for result in outputs:
            results.append(result)
    return results


def main():
    file_groups = [["a", "b"], ["c"], ["d", "e", "f"]]
    results = check_gaze(file_groups)
    print(results)


if __name__ == "__main__":
    main()

There is indeed shared states hidden deeply in the face_recognition dependency of the eye_game package. If you dig inside eye_game.get_gaze_direction(), you'll see that deep down the call tree there is a call to face_recognition.face_locations() with accesses a face detector instantiated as a global variable (here). This then calls into a C++ library (dlib) which is likely not thread-safe.

From this observation, you have few solutions to address the issue:

  • Use a lock/mutex around the code using the shared state to avoid concurrent accesses. This will work with treads, however, this defeats the purpose of multi-threading since this will negate any speed gain now that the calls are sequentials.
  • Use multi-processing instead as advised in another answer. However, keep in mind that the global state (the face detector here) will be instantiated multiple time, once per process. This can lead to high memory usage if the model is large and could trigger Out Of Memory (OOM) errors.
Sign up to request clarification or add additional context in comments.

2 Comments

Thankyou very much for the insights! I did try multiple solutions using ThreadPoolExecutor and making a class that inherits from the Thread class however I faced the same issues. My current hypothesis based on long debugging sessions is that the issue lies in gaze_tracking function inside the process_files as I dont get the random behavior if I use dummy code instead. I'll test your code against it on weekend and will post the results here.
Hi. I integrated my code with the code in the answer and unfortunately its still the same issue. Sometimes the script works, other times it just crashes. As for gaze_tracking, it's being done using an open source library (pypi.org/project/eye-game). Essentially, Im storing the images from frames in the video locally, and then dividing them into x batches and then I call the gaze detection function to get the gaze. I'll put the code in my question above. Thanks.
2
+50

Maybe your program crashed with a segfault or core dump? In those cases, these are uncontrolled crash and your program may terminate immediately and there may not be any python-level exception handling.

In pure Python code, uncontrolled crash like that is extremely rare, they're quite unusual, but it is not uncommon to have an uncontrolled crash leading into a segfault/coredump if you call into a native library and there's a bug in that library, or if you passed an argument of the wrong type, or if you call the library in ways that are not thread safe. It seems like the library you're calling here is a software for face tracking, which almost certainly would internally have some native code.

If you replace the multithreading with multiprocessing does the program still crash? If they do, then that may be a bug in the library or there are issues with the arguments that you passed into the library (e.g. it's expecting a string, but you passed in an int).

If the code only crash in multi threaded code, try rewriting your code to use multi processing. Does it still crash in multi processing? If the program crash in multithreading but not multiprocessing, then it's likely that the library is not thread safe. It may have an internal global state and calling it in multithreaded may cause a crash that normally don't happen in single threaded code.

Does the issue still happen if you deep copy your input parameters before you pass it to the thread:

t1 = threading.Thread(
    target=lambda files: 
        r1.extend(process_files(files)
    ),
    args=[deepcopy(file_groups[0])],
)

If deepcopying the input parameters fixes the issue, then there may have been shared mutable state between the input parameters and if the library isn't fully thread safe, that may be the cause of issues.

Note that even code that are read only in Python actually is mutable at native code level due to reference counting, if the library uses the C macros provided by CPython to handle ref counts and if it handles acquiring and releasing GIL correctly, then these should normally not be an issue, but if the library that doesn't, then there is a good chance that it may coredump/segfault.

3 Comments

Awesome insights! I did try multiprocessing using ProcessPoolExecutor previously but it gave some issues so I didnt get much deeper in it. I went back to that approach after reading this answer, fixed those bugs and the random crashes are gone!!
@stuckoverflow could you update your question - showing the bug you fixed that allowed your original solution to run without any crashes. Detailing this information will be helpful alongside this answer to the community if they are searching for solutions similar to what you encountered.
@djmonki Sure, I'll edit and post that in the question. Essentially I just used a process pool instead of a thread pool.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.