I am building a simple example to understand how dask distributed can distribute python scripts on a HPC cluster. The method to distribute is a basic operation, that writes a file on disk. This script works fine when run from command line ($python simple-function.py)
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open("results.txt", 'w') as file:
file.write(str(x) + '\n')
Now, I have created another python script that will distribute this code. The idea is to use subprocess and client.map or client.submit to launch multiple instances of the above script. The problem I run into is that the output .txt file is not written when using any of the methods below (client.map, client.submit then gather or compute or .result()). Maybe I'm not using the correct method ?
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
def create_cluster():
cluster = PBSCluster(
cores=4,
memory="20GB",
interface="ib0",
queue="qdev",
processes=4,
nanny=True,
walltime="12:00:00",
shebang="#!/bin/bash",
local_directory="$TMPDIR"
)
cluster.scale(4)
time.sleep(10) # Wait for workers
return cluster
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def close_all(client, cluster):
client.close()
cluster.close()
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
cluster = create_cluster()
client = Client(cluster)
time.sleep(1)
script, n_nodes = get_args() #Get arguments
#With client.submit
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
[f.result() for f in futures]
#Or client.map
L = client.map(methode, *[script, str(range(10))])
client.compute(L)
client.gather(L)
time.sleep(20)
close_all(client, cluster)
On a side note if I execute the following code : dask.compute(methode(args))
Then, the .txt output file is written.
It seems that only the different client methods are not working