Advice on how to structure Dask computation

I’m running a computation that is shared between a local machine and a Slurm cluster.

On the local machine I run:

  1. start a dask client to the SLURMCluster
  2. a for loop generates a set of data vectors of length N from a very long time-series of length T every 200 or so days. T>>N.
  3. As soon as a data vector for a time t is ready, a function that uses the data vectors is submitted to the client
  4. After all the functions are submitted a gather call is made and the output data is retrieved to the local machine.
  5. The output data (which is large) from different functions is then further processed to generate the final results.

Step 4 takes a long time. What is a smarter/more efficient way of doing this? Is this a case where an Actor would help? All I want is to retrieve the final results.

Many thanks for any advice!

Hi @liberabaci,

Could this part of the code run on the Worker? How is stored you time-series?

Just to be clear, using Client.submit in a for loop?

Do you need the entire results to process it? Could you process it batch by batch?

A few possibilities I see, depending on your workflow constraints

  • Use as_completed structure: Futures — Dask documentation, this would allow you to gather data as soon as it is processed.
  • Process results directly on the Dask cluster, and not on Client side.

Could you detail a bit more your workflow with some code snippets?

Hi @guillaumeeb,

The main for loop could be run on a worker but why would that provide an advantage? I’m actually asking because that is unclear to me. To do so, would I just run the main loop in a function and just call client.submit? The data is stored in an hdf.

Yes

That’s the part that’s unclear to me. Is this done by getting a worker to call client.submit?

Yes. I’m working on adding a simplified example.

Many thanks for your help!

Couldn’t you load all the data on Workers, using read_hdf from Dask Array, and then work with it? I was not meaning running a for loop, but reading data in a distributed way.

Nope, by submitting another function that takes into input a Future of a previous submit call. But you could also use Delayed API.

hi @guillaumeeb

Thanks for your response. I think the Delayed API or passing the last computation to a function and submitting it will be the solution to my issue. That said I tried running the following code to test my understanding:

import numpy
import os
from dask import delayed
client_obj = start_dask(num_workers=5)
@delayed
def make_random_data(num_data_points):
    return numpy.random.normal(loc=2.0, scale=1.0, size=num_data_points)
@delayed
def process(data_list):
    final_val = 0.0
    for data_obj in data_list:
        final_val += numpy.sum(data_obj)
    return final_val

if with_client_submit:

    data_list = []
    for idx in range(100):
        obj = client_obj.submit(make_random_data, 10)
        data_list.append(obj)

    obj = client_obj.submit(process, data_list)
    result = client_obj.gather(obj)
    print('l46', result.compute(), len(data_list))

else:
    data_list = []
    for idx in range(100):
        obj = delayed(make_random_data)(10)
        data_list.append(obj)

    result = delayed(process)(data_list)
    print('l60', result.compute(), len(data_list))

Whether I run with with_client_submit=1 or 0 I get the same result however when I look at the dashboard I see this difference:

with_client_submit=1
I get 1 make_random_data job and 1 process job


with_client_submit=0
I get 100 make_random_data jobs and 1 process job

Why do I only see 1 make_random_data job in the first case?
len(data_list) = 100 for both cases.

Two things:

  • First, you shouldn’t mix Delayed and Future that ways, i.e. do not submit Delayed function, and then call result.compute() after gather(). You can work with Delayed and Future, but it is not advised to submit delayed call which won’t be resolve until a compute call. So just remove the delayed decorator on the functions. Moreover you are also using the delayed() function on the second part, which makes a double Delayed call also.
  • For your problem, the thing is that Dask considers functions as pure by default, which means the same function with the same arguments will always return the same result. So in your case, since you are calling 100 times the same function and the same arg, it will just be executed once. This does not happen with Delayed probably because the 100 calls are sent at once to the Scheduler.

You can use

obj = client.submit(make_random_data, 10, pure=False)

To force function re-evaluation 100 times.

All clear. Thanks so much for the guidance.

1 Like

Hi @guillaumeeb,

I’ve gotten both approaches to work for me. Is there any difference in the way the jobs/memory are handled between using the delayed api or submitting a final function which takes in as arguments a set of futures? Many thanks again for your help.