Dask cluster raises issues about GIL on a horizontally scaled setup

I am new to dask and dask’s whole ecosystem and so it is possible

that I did not understand the difference between dask, dask-jobqueue

and dask-distributed or might be getting lost in the jargon of the

ecosystem.

I have a bit of code setup as follows


import dask

from dask import delayed

delayed_results = [delayed(my_module.my_function)(ref_series=ref_series,input=input_item,output_dir=output_path,remove=False) for input_item in input_file_list]

# ref_series is a constant object

# input_item is an iterable

# output_path is a constant:str

# remove is a boolean

pbs_client.upload_file("my_module.py")

pbs_client.scatter(ref_series) # given that it is a constant object it felt appropriate to give it to all the workers

dask.compute(*delayed_results)

Something of note is that the function does not return anything and just writes output directly to file.

The function calls a module that has been optimized meticulously and verified to work on input files. For the list of input files, for all of them I verified that they properly produce output in about a second give or take milliseconds on a 16 gig 2 core machine. Now the next step in my pipeline was to scale this module horizontally to multiple machines of the same shape. So, for 4000 input files processed on a single machine took about 4,000 seconds (verified) but given that I have access to multiple machines on a PBSCluster I wanted to use dask to scale the pipeline in a more pythonic way. I used the code above on a PBSCluster made using dask and it just … stalls. The cluster has 20 workers and one mother worker, I was under the impression that this should have been ((number of input files)/(number of workers)). The workers are producing the following error 2023-12-14 03:52:55,536 - distributed.core - INFO - Event loop was unresponsive in Worker for 5.43s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability. So, because of the stalling a single machine gets the job done faster than the cluster.

The error confuses me because all the workers have everything they need from the dask.compute, all the function's input are available to the workers locally. The constants and variables have been passed. I am not sure if the worker are raising warnings about their GIL or the mother worker’s GIL.

Is the above setup not the right way to horizontally distribute function calls across a number of machines? Did I misunderstand the goals of dask.compute and if so should I use something else to naively distribute a simple list of function falls (same function different inputs) across a number of machines?

Hi @RijanHasTwoEars7, welcome to Dask community!

I’m not sure at what you are referring to when speaking of mother worker. How do you start your PBSCluster?

The overall concept looks good, but maybe there is a little detail that causes everything to fail. In any case, your use of Delayed and dask.compute() is fine.

Did you try first using a LocalCluster? Did you try to look at the Dashboard to have some insight of what was happening to your tasks?

1 Like

welcome to Dask community!

Much appreciated!

Mother worker

Apologies! My and my co-workers have been calling the script that manages/initiates the dask-workers as the mother workers. Is the right term nanny or manager?

Talking of localclusters and PBSCluster

Unfortunately, the issue at hand is that the PBSCluster is really a PBSCluster implemented on top of SOCA on AWS EC2 instances. It is turtles all the way down in terms of abstractions and I am chasing error messages.

For now the only concrete error message that I can really investigate is the error about the GIL from the dask-workers.

Is there something I can do know which GIL the worker is waiting on? It’s own or the manager/nanny?

Finally, if each dask-worker is doing a lot of file I/O operations, would that cause GIL issues?

I’m not sure of what you are talkin about yet. Are you using dask-worker CLI? The Nanny is a process that monitors Dask Workers on a given machine, it is used by default in a distributed setup.

Is it PBSCluster from dask-jobqueue, or a custom implementation of yours? Have you access to the Dashboard?

My idea was to try your code in another environment, your laptop or a single server, using LocalCluster to simplify things and be able to monitor more easily.

If you want to go deep, there is GIL monitoring.

The problem must come from the Worker Python process.

IO does not cause GIL issues, only pure Python code.

What is the setup of your PBSCluster?