1

I have a function that takes a while to calculate, and has to be iterated >20k times with two varying parameters:

from ipyparallel import Client
import numpy as np

m_array = np.arange(0, 101, 1)
s_array = np.arange(0, 201, 1)
rc = Client()
rc[:].push(dict(stuff=stuff))
view = rc.load_balanced_view()
async_results = []

for m in m_array:
    for s in s_array:
        chi = view.apply_async(run_simulation, m=m, s=s)
        async_results.append(chi)
rc.wait(async_results)
results = [ar.get() for ar in async_results]

I see there is a wait_interactive method available, however I have not been able to figure out how to use it. What's the best way to print out a status update at some given interval?

Update

I added the all_ids list, and the get_result().wait_interative() methods.

async_results = []
all_ids = []
for m in m_array:
    for s in s_array:
        chi = view.apply_async(run_simulation, m=m, s=s)
        async_results.append(chi)
        all_ids.extend(chi.msg_ids)
rc.get_result(all_ids).wait_interactive()
rc.wait(async_results)
results = [ar.get() for ar in async_results]

This produces the periodic status updates as expected, however now produces a traceback.

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-36-85db6ca605cd> in <module>()
    220 rc.get_result(all_ids).wait_interactive()
    221 rc.wait(async_results)
--> 222 results = [ar.get() for ar in async_results]
223 results = np.array(results)
224 results.shape = (len(m_array), len(s_array))

//anaconda/lib/python2.7/site-packages/ipyparallel/client/asyncresult.pyc in get(self, timeout)
     95         by get() inside a `RemoteError`.
     96         """
---> 97         if not self.ready():
     98             self.wait(timeout)
     99 

//anaconda/lib/python2.7/site-packages/ipyparallel/client/asyncresult.pyc in ready(self)
    113         """Return whether the call has completed."""
    114         if not self._ready:
--> 115             self.wait(0)
    116         elif not self._outputs_ready:
    117             self._wait_for_outputs(0)

//anaconda/lib/python2.7/site-packages/ipyparallel/client/asyncresult.pyc in     wait(self, timeout)
    152                 if self.owner:
    153 
--> 154                     self._metadata = [self._client.metadata.pop(mid) for mid in self.msg_ids]
155                     [self._client.results.pop(mid) for mid in self.msg_ids]
    156 

KeyError: '884328c8-d768-48d5-b477-a256ebaea7a9'

Are the message IDs or results being cleared out somewhere before the ar.get() method gets to find them?

1 Answer 1

3

wait_interactive is a method on AsyncResult objects. It will be a method on the Client itself soon, but is not currently. That means to use wait_interactive, you need to build an AsyncResult wrapping all of your results. The easiest way to do this is to maintain a single list of all msg_ids corresponding to your requests:

all_ids = []
for m in m_array:
    for s in s_array:
        chi = view.apply_async(run_simulation, m=m, s=s)
        async_results.append(chi)
        all_ids.extend(chi.msg_ids)

rc.get_result(all_ids, owner=False).wait_interactive()
Sign up to request clarification or add additional context in comments.

2 Comments

I added the all_id's list, and the rc.get_result().wait_interactive line, but am now getting another traceback. See the update above
added missing owner=False argument that may be needed to avoid removing entries from the cache.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.