5

I have a numpy array of coordinates of size n_slice x 2048 x 3, where n_slice is in the tens of thousands. I want to apply the following operation on each 2048 x 3 slice separately

import numpy as np
from scipy.spatial.distance import pdist

# load coor from a binary xyz file, dcd format

n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])

# this loop is what I want to parallelize, each slice is completely independent
for i in xrange(n_slice): 
    dist[i, r[:, None] < r] = pdist(coor[i])

I tried using Dask by making coor a dask.array,

import dask.array as da
dcoor = da.from_array(coor, chunks=(1, 2048, 3))

but simply replacing coor by dcoor will not expose the parallelism. I could see setting up parallel threads to run for each slice but how do I leverage Dask to handle the parallelism?

Here is the parallel implementation using concurrent.futures

import concurrent.futures
import multiprocessing

n_cpu = multiprocessing.cpu_count()

def get_dist(coor, dist, r):
    dist[r[:, None] < r] = pdist(coor)

# load coor from a binary xyz file, dcd format

n_slice, n_coor, _ = coor.shape
r = np.arange(n_coor)
dist = np.zeros([n_slice, n_coor, n_coor])

with concurrent.futures.ThreadPoolExecutor(max_workers=n_cpu) as executor:
    for i in xrange(n_slice):
        executor.submit(get_dist, cool[i], dist[i], r)

It is possible this problem is not well suited to Dask since there are no inter-chunk computations.

1

1 Answer 1

6

map_blocks

The map_blocks method may be helpful:

dcoor.map_blocks(pdist)

Uneven arrays

It looks like you're doing a bit of fancy slicing to insert particular values into particular locations of an output array. This will probably be awkward to do with dask.arrays. Instead, I recommend making a function that produces a numpy array

def myfunc(chunk):
    values = pdist(chunk[0, :, :])
    output = np.zeroes((2048, 2048))
    r = np.arange(2048)
    output[r[:, None] < r] = values
    return output

dcoor.map_blocks(myfunc)

delayed

Worst case scenario you can always use dask.delayed

from dask import delayed, compute
coor2 = delayed(coor)
slices = [coor2[i] for i in range(coor.shape[0])]
slices2 = [delayed(pdist)(slice) for slice in slices]
results = compute(*slices2)
Sign up to request clarification or add additional context in comments.

2 Comments

The output of pdist is an array of the distances between each pair of coordinates in the slice. For my example the result from each slice has dimensions (2048 * 2047 / 2,). These arrays are actually sufficient. I think this answer provides a good start though I think map_blocks is trying to execute pdist as if I passed in the entire 3D array. I only want it to calculate on the 2D slices, which I made into chunks; there should be no cross-chunk calculations.
It calls pdist on each (1, 2048, 3) sized chunk. I would wrap your pdist function in something that takes out the first dimension and then returns the full square matrix. Edited my answer above.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.