Large arrays computing in parallel

Hi All,

I have a computation that involves the same length of the large arrays with a length of 5175148. From what I read in the Best Practices doc saying that …

Screen Shot 2022-09-20 at 3.59.42 PM

Because of it, I wrote my code as follows.

def search_neighbors(hexagons_all, h3str):
    for h3str in h3strs:
        L1_neighbors = list(h3.hex_ring(h3str,1))
        L0_ij = h3.experimental_h3_to_local_ij(origin=h3str, h=h3str)
        L1_ij = np.array([h3.experimental_h3_to_local_ij(origin=h3str, h=L1) 
                          for L1 in L1_neighbors]) - L0_ij
        L1_neighbors = [ h3.string_to_h3(a) for a in L1_neighbors ]    
        idx = np.lexsort((L1_ij[:,1], L1_ij[:,0]))
        return np.searchsorted(hexagons_all0, L1_neighbors)[idx]
r, size = 40, len(h3strs_all0)
with MeasureTime(' search_neighbors & slope & aspect'):
#----- test 01
    neighbors = []
    for i in range(r):
        begin = int(i*size/r)
        end = int((i+1)*size/r)
        neighbors.append([
            delayed(search_neighbors)(hexagons_all, delayed(h3str))
            for h3str in h3strs_all[begin:end]
        ])
    neighbors = compute(*neighbors)
    slope, aspect = compute_slope_aspect(zs_all, h3strs, neighbors.T, search_radius)

notice that the h3strs_all and hexagons_all are all as delayed variables

Screen Shot 2022-09-20 at 4.03.05 PM

However, if I do this, it gives me an error saying the Dask delayed object of unspecified length not iterable error. If I make h3strs_all0 = h3strs_all.compute() and take h3strs_all0 instead of h3strs_all, I am able to run the code, but the process is not speed up. The CPU usage is reaching up to 100%, which is way slower than If I don’t use function to run it, which is

with MeasureTime(' search_neighbors & slope & aspect, no dask'):
    neighbors = []
    for h3str in h3strs_all:
        L1_neighbors = list(h3.hex_ring(h3str,1))
        L0_ij = h3.experimental_h3_to_local_ij(origin=h3str, h=h3str)
        L1_ij = np.array([h3.experimental_h3_to_local_ij(origin=h3str, h=L1) 
                          for L1 in L1_neighbors]) - L0_ij
        L1_neighbors = [ h3.string_to_h3(a) for a in L1_neighbors ]    
        idx = np.lexsort((L1_ij[:,1], L1_ij[:,0]))
        neighbors(np.searchsorted(hexagons_all0, L1_neighbors)[idx])
    neighbors = np.array(neighbors)
    slope, aspect = compute_slope_aspect(zs_all, h3strs, neighbors.T, search_radius)

Could someone help me to optimize this code, please?

Thanks
cyhsu

Hi cyhsu,

h3strs_all[begin:end] gives an error, because you can’t index a Delayed object. Dask doesn’t know the Delayed object is wrapping an array. When you first call compute on the Delayed object with h3strs_all0 = h3strs_all.compute(), it gets turned into a concrete array again, and you can index it. At first sight, there actually seems no need to delay h3strs_all.

What do you mean when you say “The CPU usage is reaching up to 100%, which is way slower than If I don’t use function to run it”? It sounds good that your CPU usage is high, that means you’re parallellization is probably working and you are using all available CPU power. Is your total runtime slower when you use Dask Delayed?

Hi @MaximLippeveld

Yes, you are correct. The total run time is way slower than using for loop directly. That is why I am struggling with this. If I use for loop, it takes around 3 mins and a half, but if I use delayed function, it takes over 20 minutes.

How is hexagons_all computed? I’m not sure, but you might need to call persist on it so that it does not get recomputed for each loop.

With respect to how you use h3strs_all in your second block of code, the documentation states this

Delayed objects cannot be used for control flow, meaning that no Delayed can appear in a loop or if statement.

You can use indexing on a delayed object in the body of a for loop. So you can solve the problem by reformatting your list comprehension as a for loop.

for i in range(r):
  begin = int(i*size/r)
  end = int((i+1)*size/r)
  neighbors.append([
    delayed(search_neighbors)(hexagons_all, delayed(h3str))
    for h3str in h3strs_all[begin:end]
  ])

You could write this:

for i in range(r):
  begin = int(i*size/r)
  end = int((i+1)*size/r)
  neighbors_nested = []
  for j in range(begin, end):
    neighbors_nested.append(delayed(search_neighbors)(hexagons_all, delayed(h3str_all[j])))
  neighbors.append(neighbors_nested)

This is untested so I’m not entirely sure this will work.

Also, I assume here

neighbors = compute(*neighbors)

compute is dask.compute?

Hi @MaximLippeveld

For the issues you addressed…

  1. hexagons_all is the output of pandas. I put additional delayed on it because of the example.

  2. I have tried similar thing like

for i in range(r):
  begin = int(i*size/r)
  end = int((i+1)*size/r)
  neighbors_nested = []
  for j in range(begin, end):
    neighbors_nested.append(delayed(search_neighbors)(hexagons_all, delayed(h3str_all[j])))
  neighbors.append(neighbors_nested)

since it uses a loop for indexing, it will take a coupled of minutes to complete the task. Also, the overall computing speed is not in-efficiency as well.

  1. Yes, you are right. The compute is dask.compute.