Advice on how to structure Dask computation

I’m running a computation that is shared between a local machine and a Slurm cluster.

On the local machine I run:

  1. start a dask client to the SLURMCluster
  2. a for loop generates a set of data vectors of length N from a very long time-series of length T every 200 or so days. T>>N.
  3. As soon as a data vector for a time t is ready, a function that uses the data vectors is submitted to the client
  4. After all the functions are submitted a gather call is made and the output data is retrieved to the local machine.
  5. The output data (which is large) from different functions is then further processed to generate the final results.

Step 4 takes a long time. What is a smarter/more efficient way of doing this? Is this a case where an Actor would help? All I want is to retrieve the final results.

Many thanks for any advice!

Hi @liberabaci,

Could this part of the code run on the Worker? How is stored you time-series?

Just to be clear, using Client.submit in a for loop?

Do you need the entire results to process it? Could you process it batch by batch?

A few possibilities I see, depending on your workflow constraints

  • Use as_completed structure: Futures — Dask documentation, this would allow you to gather data as soon as it is processed.
  • Process results directly on the Dask cluster, and not on Client side.

Could you detail a bit more your workflow with some code snippets?

Hi @guillaumeeb,

The main for loop could be run on a worker but why would that provide an advantage? I’m actually asking because that is unclear to me. To do so, would I just run the main loop in a function and just call client.submit? The data is stored in an hdf.

Yes

That’s the part that’s unclear to me. Is this done by getting a worker to call client.submit?

Yes. I’m working on adding a simplified example.

Many thanks for your help!

Couldn’t you load all the data on Workers, using read_hdf from Dask Array, and then work with it? I was not meaning running a for loop, but reading data in a distributed way.

Nope, by submitting another function that takes into input a Future of a previous submit call. But you could also use Delayed API.

hi @guillaumeeb

Thanks for your response. I think the Delayed API or passing the last computation to a function and submitting it will be the solution to my issue. That said I tried running the following code to test my understanding:

import numpy
import os
from dask import delayed
client_obj = start_dask(num_workers=5)
@delayed
def make_random_data(num_data_points):
    return numpy.random.normal(loc=2.0, scale=1.0, size=num_data_points)
@delayed
def process(data_list):
    final_val = 0.0
    for data_obj in data_list:
        final_val += numpy.sum(data_obj)
    return final_val

if with_client_submit:

    data_list = []
    for idx in range(100):
        obj = client_obj.submit(make_random_data, 10)
        data_list.append(obj)

    obj = client_obj.submit(process, data_list)
    result = client_obj.gather(obj)
    print('l46', result.compute(), len(data_list))

else:
    data_list = []
    for idx in range(100):
        obj = delayed(make_random_data)(10)
        data_list.append(obj)

    result = delayed(process)(data_list)
    print('l60', result.compute(), len(data_list))

Whether I run with with_client_submit=1 or 0 I get the same result however when I look at the dashboard I see this difference:

with_client_submit=1
I get 1 make_random_data job and 1 process job


with_client_submit=0
I get 100 make_random_data jobs and 1 process job

Why do I only see 1 make_random_data job in the first case?
len(data_list) = 100 for both cases.