0

I am using Dask to setup a cluster. For now I am setting up both the scheduler and the workers on localhost.

cluster = SSHCluster(["localhost", "localhost"],
                    connect_options={"known_hosts": None},
                    worker_options={"n_workers": params["n_workers"], },
                    scheduler_options={"port": 0, "dashboard_address": ":8797"},)

client = Client(cluster)

Is there a way to create stateful global parameters that can be initialized on the workers' side and be used by any worker_methods that are subsequently assigned to be computed on the workers?

I have found the client.register_worker_plugin method.

def read_only_data(self, jsonfilepath):
    with open(jsonfilepath, "r") as readfile:
        return json.load(read_file)

def main():
    cluster = SSHCluster(params) # simplified
    client = Client(cluster)
    plugin = read_only_data(jsonfilepath)
    client.register_worker_plugin(plugin, name="read-only-data")

However, ReadOnlyData is initialized on the client-side, hence, self.persons and self.persons_len, are copied over to the workers (and not initialized on the workers' side). While this may be useful for small data, if the data set is massive, this will incur additional communication overhead to copy over (unless I am missing something conceptually).

Let's say ReadOnlyData and the file in "jsonfilepath" was available on the workers' side. We could call it from "worker_method_1" and "worker_method_2" that feature some arbitrary logic; however, this means that it would have to be called every time the workers are called. Is there some "initialization" event/method, that happens on the workers' side, right after worker creation, and before the assignment of the worker methods, that would allow us to pre-load some data structures as stateful global parameters, commonly shared among subsequent instances of the worker methods?

Update

After trying @mdurant's suggestions, with a JSON file of 280mb, the code gets stuck on the client.replicate() for over an hour. Loading the file in a single process without Dask, takes less than 20 seconds. In the dashboard, the workers are all using approx 2 GB, and the memory was increasing. There's also network activity being recorded.

Dask Dashboard

All of a sudden, the script crashed with the below:

distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.replicate local=tcp://192.168.0.6:38498 remote=tcp://192.168.0.6:46171>: TimeoutError: [Errno 10] Connection timed out

The memory usage is excessive. The only memory I have is 280 mb for the JSON. 280mb x 6 workers should amount to approx. 1.7gb, and definitely not 2gb on each worker.

I suspect the JSON is being copied to all workers. Dask's documentation also states that the data is copied onto the workers:

replicate -> Set replication of futures within network. Copy data onto many workers. This helps to broadcast frequently accessed data and can improve resilience. This performs a tree copy of the data throughout the network individually on each piece of data. This operation blocks until complete. It does not guarantee replication of data to future workers.

However, still, this does not explain the excessive memory usage and why Dask is not managing to copy 280 mb to 6 workers in less than 1 hour.

5
  • I think if the data is massive it will take longer time reading the data from the source in every worker than transferring pieces of data when they are needed. What dask API you are using? dask dataframes or dask futures? or some other? Commented Sep 15, 2023 at 10:29
  • this is read only data that is widely used in all the worker methods I have. it is not really possible to partition it. I would like to pre-load it directly on the workers, and make it available globally Commented Sep 15, 2023 at 20:19
  • Would distributed.dask.org/en/stable/… do the trick? Else, if using plugin, you'll need to implement the setup function, not just init. Commented Sep 15, 2023 at 20:24
  • I haven't found a single usage example on "register_worker_callbacks" on the whole internet. tried to get it to work but to no avail. tried again with register_plugin, and I noticed that I was probably not using it correct. referred to the documentation, but it seems outdated. certain methods such as client.register_plugin no longer exists in the library. using the suggested setup such as CustomPlugin(WorkerPlugin) with a setup method doesn't seem to work. and in any case, the documentation is not very helpful. is there any fully working example anywhere online? Commented Sep 18, 2023 at 23:15
  • I have updated the question to reflect the only way I managed to get client.register_worker_plugin to work. However, this does not look like any of the examples in the Dask documentation. when debugging, I could access the read_only_data method from the client-side. Is this because this is how the register_worker_plugin method is meant to work, or because it is not using the proper setup? if there is a proper setup, would you be so kind as to direct me to it? Commented Sep 18, 2023 at 23:54

2 Answers 2

1

There are many ways to do this kind of thing, but I would suggest the simplest is also the best. You client-side code may look something like this.

# a function that will run on workers only
def read_only_data(jsonfilepath):
    with open(jsonfilepath, "r") as readfile:
        return jsonload(readfile)

# make the function run on a worker
future = client.submit(readonlydata, mypath)

# have other workers either run the function or copy
# its results asynchronously
client.replicate(future)

# function that does the processing you want
def does_the_work(json_result):
    ...

# the "future" here becomes the result of the JSON
# when this is run by a worker
future2 = client.submit(does_the_work, future)
# now choose to wait on that future or do other stuff

and now use the future in other things. When the future and its dependencies are deallocated (e.g., del future), the data on the workers is freed.

Other options include client.run() (execute a function on all workers imperatively), distributed.Variable and indeed the worker plugin. However remember: if you use normal dask (e.g., delayed), then a task that needs data stored on another worker, it will be copied on demand without any action on your part.

Sign up to request clarification or add additional context in comments.

13 Comments

thanks for this. will try it out soon
can you please let me know how to access the return of read_only_data via the worker? and is read_only_data computed on the worker or on the client?
The function is run on one or more workers. Use the future in another submit or delayed function to use its value on a worker. docs.dask.org/en/latest/futures.html
this is still not clear to me. my requirement was to run a function that loads global parameters on all the workers, upon creation, and make them available to any worker methods that are subsequently run on them. do I understand correctly that you are suggesting calling client.submit and client.replicate from the worker methods? if so, how can I ensure that a subsequent run of the same worker_method does not re-call the function again? and how can I access the future from another worker method instance, context-wise?
I was imagining some kind of "registration" being called from the client, that instructs the workers to run the "read_only_data" function, when the workers are created and before the worker methods are run. then calling something like: get_worker().data["read_only_data"] (which I know does not exist but just to give you an idea) from the worker methods (because read_only_data would have already been called and its return would have already been made available globally)
|
0

The only way that seems to work is something like this:

def read_only_data(dask_worker: Worker, jsonfilepath):
    with open(jsonfilepath, "r") as readfile:
        dask_worker.data["customdata"] = json.load(read_file)

def main():
    cluster = SSHCluster(params) # simplified
    client = Client(cluster)
    callback = partial(read_only_data, jsonfilepath=jsonfilepath)
    client.register_worker_callbacks(callback)

def worker_method():
    worker = get_worker()
    custom_data = worker.data["customdata"]

And from what I can see the read_only_data method is computed on the worker side, as intended. There is an open issue to mark the register_worker_callbacks method as deprecated in favour of the WorkerPlugin, however, I did not manage to work with the WorkerPlugin.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.