I am new to dask and dask’s whole ecosystem and so it is possible
that I did not understand the difference between dask, dask-jobqueue
and dask-distributed or might be getting lost in the jargon of the
ecosystem.
I have a bit of code setup as follows
import dask
from dask import delayed
delayed_results = [delayed(my_module.my_function)(ref_series=ref_series,input=input_item,output_dir=output_path,remove=False) for input_item in input_file_list]
# ref_series is a constant object
# input_item is an iterable
# output_path is a constant:str
# remove is a boolean
pbs_client.upload_file("my_module.py")
pbs_client.scatter(ref_series) # given that it is a constant object it felt appropriate to give it to all the workers
dask.compute(*delayed_results)
Something of note is that the function does not return anything and just writes output directly to file.
The function calls a module that has been optimized meticulously and verified to work on input files. For the list of input files, for all of them I verified that they properly produce output in about a second give or take milliseconds on a 16 gig 2 core machine. Now the next step in my pipeline was to scale this module horizontally to multiple machines of the same shape. So, for 4000 input files processed on a single machine took about 4,000 seconds (verified) but given that I have access to multiple machines on a PBSCluster I wanted to use dask to scale the pipeline in a more pythonic way. I used the code above on a PBSCluster made using dask and it just … stalls. The cluster has 20 workers and one mother worker, I was under the impression that this should have been ((number of input files)/(number of workers)). The workers are producing the following error 2023-12-14 03:52:55,536 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.43s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability. So, because of the stalling a single machine gets the job done faster than the cluster.
The error confuses me because all the workers have everything they need from the dask.compute, all the function's input are available to the workers locally. The constants and variables have been passed. I am not sure if the worker are raising warnings about their GIL or the mother worker’s GIL.
Is the above setup not the right way to horizontally distribute function calls across a number of machines? Did I misunderstand the goals of dask.compute and if so should I use something else to naively distribute a simple list of function falls (same function different inputs) across a number of machines?