I’m running a computation that is shared between a local machine and a Slurm cluster.
On the local machine I run:
- start a dask client to the SLURMCluster
- a for loop generates a set of data vectors of length N from a very long time-series of length T every 200 or so days. T>>N.
- As soon as a data vector for a time t is ready, a function that uses the data vectors is submitted to the client
- After all the functions are submitted a gather call is made and the output data is retrieved to the local machine.
- The output data (which is large) from different functions is then further processed to generate the final results.
Step 4 takes a long time. What is a smarter/more efficient way of doing this? Is this a case where an Actor would help? All I want is to retrieve the final results.
Many thanks for any advice!
Hi @liberabaci,
Could this part of the code run on the Worker? How is stored you time-series?
Just to be clear, using Client.submit in a for loop?
Do you need the entire results to process it? Could you process it batch by batch?
A few possibilities I see, depending on your workflow constraints
- Use
structure: Futures — Dask documentation, this would allow you to gather data as soon as it is processed.
- Process results directly on the Dask cluster, and not on Client side.
Could you detail a bit more your workflow with some code snippets?
Hi @guillaumeeb,
The main for loop could be run on a worker but why would that provide an advantage? I’m actually asking because that is unclear to me. To do so, would I just run the main loop in a function and just call client.submit? The data is stored in an hdf.
That’s the part that’s unclear to me. Is this done by getting a worker to call client.submit?
Yes. I’m working on adding a simplified example.
Many thanks for your help!
Couldn’t you load all the data on Workers, using read_hdf from Dask Array, and then work with it? I was not meaning running a for loop, but reading data in a distributed way.
Nope, by submitting another function that takes into input a Future of a previous submit call. But you could also use Delayed API.
hi @guillaumeeb
Thanks for your response. I think the Delayed API or passing the last computation to a function and submitting it will be the solution to my issue. That said I tried running the following code to test my understanding:
import numpy
import os
from dask import delayed
client_obj = start_dask(num_workers=5)
def make_random_data(num_data_points):
return numpy.random.normal(loc=2.0, scale=1.0, size=num_data_points)
def process(data_list):
final_val = 0.0
for data_obj in data_list:
final_val += numpy.sum(data_obj)
return final_val
if with_client_submit:
data_list = []
for idx in range(100):
obj = client_obj.submit(make_random_data, 10)
obj = client_obj.submit(process, data_list)
result = client_obj.gather(obj)
print('l46', result.compute(), len(data_list))
data_list = []
for idx in range(100):
obj = delayed(make_random_data)(10)
result = delayed(process)(data_list)
print('l60', result.compute(), len(data_list))
Whether I run with with_client_submit=1 or 0 I get the same result however when I look at the dashboard I see this difference:
I get 1 make_random_data job and 1 process job
I get 100 make_random_data jobs and 1 process job
Why do I only see 1 make_random_data job in the first case?
len(data_list) = 100 for both cases.