7

I have copied all the codes to the work directory on all my engine machines. And my code are:

my_test.py
my_startegy.py
main.py

So, main.py will be ran on Client Machine, the codes in main.py are:

from ipyparallel import Client

import my_test
import my_strategy as strategy


class Beck_Test_Parallel(object):
    """
    """
    def __init__(self):
        self.rc = None
        self.dview = None

    def start_client(self, path):
        self.rc = Client(path)
        self.dview = self.rc[:]
        #self.dview.push(dict(
        #        Account=my_test.Account, 
        #        dataImport=my_test.dataImport
        #    ))

    def parallel_map(self, deal_function, accounts):
        import my_test
        return self.dview.map_sync(deal_function, accounts)

def create_accounts(time_list, account):
    accounts = []
    for index, time in enumerate(time_list):
        acc = my_test.Account(
                strategy.start, 
                strategy.end, 
                strategy.freq,
                strategy.universe_code, 
                strategy.capital_base, 
                strategy.short_capital, 
                strategy.benchmark, 
                strategy.self_defined
            )
        account.share_data(acc)
        acc.iniData2()
        acc.iniData3()
        acc.current_time = time
        acc.days_counts = index+1
        acc.dynamic_record['capital'] = acc.capital_base
        del acc.connect
        accounts.append(acc)
    return accounts

def let_us_deal(account):
    account =  strategy.handle_data(account) 
    print '   >>>', account.current_time
    return account


if __name__ == '__main__':
    account = my_test.Account(
            strategy.start,
            strategy.end,
            strategy.freq,
            strategy.universe_code,
            strategy.capital_base,
            strategy.short_capital,
            strategy.benchmark,
            strategy.self_defined
        )

    account.iniData()
    account.iniData2()
    account.iniData3()

    time_list = my_test.get_deal_time_list(account)

    accounts = parallel.create_accounts(time_list, account)

    back_test_parallel = parallel.Beck_Test_Parallel()

    back_test_parallel.start_client(
        '/home/fit/.ipython/profile_default/security/ipcontroller-client.json')

    back_test_parallel.dview.execute('import my_test')
    back_test_parallel.dview.execute('import my_strategy as strategy')
    # get the result
    result = back_test_parallel.parallel_map(let_us_deal, accounts)

    for acc in result.get():
        print acc.reselected_stocks, acc.current_time

And I have imported my_test module in parallel_map() function in Class Back_Test_Parallel and I have also imported my_test module in back_test_parallel.dview.execute('import my_test').

And the corresponding modules are on the engine machine's work directory. I have copied the ipcontroller-client.json and ipcontroller-engine.json to the work directory on engine machine.

But when it runs, the error is ImportError: No module named my_test, since the module my_test.py is already on the work directory. It really made me feel frustrated!

---------------------------------------------------------------------------
CompositeError                            Traceback (most recent call last)
/home/fit/log/1027/back_test/main.py in <module>()
    119     import ipdb
    120     ipdb.set_trace()
--> 121     for acc in result.get():
    122         print acc.reselected_stocks, acc.current_time
    123 

/usr/local/lib/python2.7/dist-packages/ipyparallel/client/asyncresult.pyc in get(self, timeout)
    102                 return self._result
    103             else:
--> 104                 raise self._exception
    105         else:
    106             raise error.TimeoutError("Result not ready.")

CompositeError: one or more exceptions from call to method: let_us_deal
[0:apply]: ImportError: No module named my_test
[1:apply]: ImportError: No module named my_test

something about result:

In [2]: result
Out[2]: <AsyncMapResult: finished>
In [3]: type(result)
Out[3]: ipyparallel.client.asyncresult.AsyncMapResult

Note that, when it runs on single machine by using ipcluster start -n 8, it works fine, without any error.
Thanks in advance

3
  • 1
    Can you double-check that the CWD for your engines is what you think it is? print(client[:].apply_sync(os.getcwd)) Commented Nov 30, 2015 at 10:11
  • @minrk, thank you, I got the right CWD, maybe I did not copy the newest ipcontroller-client.json ipcontroller-engine.json to engine machine. Thank you for your hard work for developing this package. Commented Dec 1, 2015 at 9:40
  • @minrk, I think my CWD is not in the right directory, so, before parallel computing, I will set the right CWD:>>> import os, >>> dview.map(os.chdir, ['/path/to/my/project/on/engine']*number_of_engines) and it works fine. thank you. Commented Dec 5, 2015 at 7:47

2 Answers 2

7

I think my CWD is not in the right directory. So you can check your CWD

>>> import os
>>> print(dview.apply_sync(os.getcwd).get())

If it is in wrong directory, before parallel computing, you can set the right CWD to make sure you ipyparallel env is in the right work directory:

>>> import os
>>> dview.map(os.chdir, ['/path/to/my/project/on/engine']*number_of_engines)
>>> print(dview.apply_sync(os.getcwd).get())

You can also check your engines' name by

>>> import socket
>>> print(dview.apply_sync(socket.gethostname))

And it works fine!

Sign up to request clarification or add additional context in comments.

Comments

-3

pip install ipyparallel --upgrade

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.