0

I am trying to call this function[1] in parallel. Therefore, I have created this function [2], and I call it like this [3][4]. The problem is that when I execute this code, the execution hangs and I never see the result, but if I execute run_simple_job in serial, everything goes ok. Why I can't execute this function in parallel? Any advice for that?

[1] function that I am trying to call

@make_verbose
def run_simple_job(job_params):
  """
  Execute a job remotely, and get the digests.
  The output will come as a json file and it contains info about the  input and output path, and the generated digest.

  :param job_params: (namedtuple) contains several attributes important for the job during execution.

        client_id (string) id of the client.
        command (string) command to execute the job
        cluster (string) where the job will run
        task_type (TypeTask) contains information about the job that will run
        should_tamper (Boolean) Tells if this job should tamper the digests or not
:return : output (string) the output of the job execution

"""
client_id = job_params.client_id
_command = job_params.command
cluster = job_params.cluster
task_type = job_params.task_type

output = // execute job

return output

[2] function that calls in parallel

def spawn(f):
  # 1 - how the pipe and x attributes end up here?
  def fun(pipe, x):
    pipe.send(f(x))
    pipe.close()

    return fun

def parmap2(f, X):
  pipe = [Pipe() for x in X]
  # 2 - what is happening with the tuples (c,x) and (p, c)?
  proc = [Process(target=spawn(f), args=(c, x))
        for x, (p, c) in izip(X, pipe)]

  for p in proc:
    logging.debug("Spawn")
    p.start()
  for p in proc:
    logging.debug("Joining")
    p.join()
  return [p.recv() for (p, c) in pipe]

[3] Wrapper class

class RunSimpleJobWrapper:
  """ Wrapper used when running a job """

  def __init__(self, params):
     self.params = params

[4] How I call the function to run in parallel

for cluster in clusters:
   task_type = task_type_by_cluster[cluster]

run_wrapper_list.append(RunSimpleJobWrapper(get_job_parameter(client_id, cluster, job.command, majority(FAULTS), task_type)))

jobs_output = parmap2(run_simple_job_wrapper, run_wrapper_list)

1 Answer 1

1

You could simply use multiprocessing:

from multiprocessing import Pool
n_jobs = -1 # use all the available CPUs
pool = Pool(n_jobs)

param_list = [...] # generate a list of your parameters


results = pool.map(run_simple_job,param_list)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.