2

I have long running file I/O tasks which I'd like to be able to move into a daemon/server process. A CLI tool would be used to queue new jobs to run, query the status of running jobs, and wait for individual jobs. Python's multiprocessing.managers looks like a nice simple way to handle the IPC. I'd like to be able to construct a SyncManager.Event for the client to wait on without blocking the server, but attempting to do so results in triggers a "server not yet started" assertion. Ironically this assertion gets sent from the server to the client, so obviously the server is started, somewhere.

Here's the minimal example:

#!/usr/bin/env python3
import time
import sys
import concurrent.futures
from multiprocessing.managers import SyncManager

def do_work(files):
    """Simulate doing some work on a set of files."""
    print(f"Starting work for {files}.")
    time.sleep(2)
    print(f"Finished work for {files}.")

# Thread pool to do work in.
pool = concurrent.futures.ProcessPoolExecutor(max_workers=1)

class Job:
    job_counter = 1

    def __init__(self, files):
        """Setup a job and queue work for files on our thread pool."""
        self._job_number = self.job_counter
        Job.job_counter += 1
        print(f"manager._state.value = {manager._state.value}")
        self._finished_event = manager.Event()

        print(f"Queued job {self.number()}.")
        future = pool.submit(do_work, files)
        future.add_done_callback(lambda f : self._finished_event.set())

    def number(self):
        return self._job_number

    def event(self):
        """Get an event which can be waited on for the job to complete."""
        return self._finished_event

class MyManager(SyncManager):
    pass

MyManager.register("Job", Job)

manager = MyManager(address=("localhost", 16000), authkey=b"qca-authkey")
if len(sys.argv) > 1 and sys.argv[1] == "server":

    manager.start()
    print(f"Manager listening at {manager.address}.")

    while True:
        time.sleep(1)
else:
    manager.connect()
    print(f"Connected to {manager.address}.")

    job = manager.Job(["a", "b", "c"])
    job.event().wait()
    print("Done")

If I run the client I see:

$ ./mp-manager.py
Connected to ('localhost', 16000).
Traceback (most recent call last):
  File "./mp-manager.py", line 54, in <module>
    job = manager.Job(["a", "b", "c"])
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 740, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 625, in _create
    id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 91, in dispatch
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 210, in handle_request
    result = func(c, *args, **kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 403, in create
    obj = callable(*args, **kwds)
  File "./mp-manager.py", line 24, in __init__
    self._finished_event = manager.Event()
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 740, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 622, in _create
    assert self._state.value == State.STARTED, 'server not yet started'
AssertionError: server not yet started
---------------------------------------------------------------------------

The server output is:

$ ./mp-manager.py server
Manager listening at ('127.0.0.1', 16000).
manager._state.value = 0
2
  • Did you solve it? I'm facing the same problem Commented Aug 24, 2022 at 15:27
  • I am no longer working on that project, but I don't recall ever getting SyncManager to do what we wanted, sorry. Commented Aug 29, 2022 at 10:37

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.