I am new to dask
and dask
’s whole ecosystem and so it is possible
that I did not understand the difference between dask
, dask-jobqueue
and dask-distributed
or might be getting lost in the jargon of the
ecosystem.
I have a bit of code setup as follows
import dask
from dask import delayed
delayed_results = [delayed(my_module.my_function)(ref_series=ref_series,input=input_item,output_dir=output_path,remove=False) for input_item in input_file_list]
# ref_series is a constant object
# input_item is an iterable
# output_path is a constant:str
# remove is a boolean
pbs_client.upload_file("my_module.py")
pbs_client.scatter(ref_series) # given that it is a constant object it felt appropriate to give it to all the workers
dask.compute(*delayed_results)
Something of note is that the function does not return anything and just writes output directly to file.
The function calls a module that has been optimized meticulously and verified to work on input files. For the list of input files, for all of them I verified that they properly produce output in about a second give or take milliseconds on a 16 gig 2 core machine. Now the next step in my pipeline was to scale this module horizontally to multiple machines of the same shape. So, for 4000 input files processed on a single machine took about 4,000 seconds (verified) but given that I have access to multiple machines on a PBSCluster
I wanted to use dask
to scale the pipeline in a more pythonic way. I used the code above on a PBSCluster
made using dask and it just … stalls. The cluster has 20 workers and one mother worker, I was under the impression that this should have been ((number of input files)/(number of workers)). The workers are producing the following error 2023-12-14 03:52:55,536 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.43s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
So, because of the stalling a single machine gets the job done faster than the cluster.
The error confuses me because all the workers have everything they need from the dask.compute
, all the function's input are available to the workers locally. The constants and variables have been passed. I am not sure if the worker are raising warnings about their GIL or the mother worker’s GIL.
Is the above setup not the right way to horizontally distribute function calls across a number of machines? Did I misunderstand the goals of dask.compute
and if so should I use something else to naively distribute a simple list of function falls (same function different inputs) across a number of machines?