No jobs sent to workers

After much kerfuffle, I have managed to get a dask cluster up and running on our HPC, which is running a PBS cluster.

In a jupyter notebook, I create the cluster with this:

cluster = PBSCluster(
                     walltime="01:00:00", 
                     memory="8GB", 
                     processes=1,
                     account = 'ao05',
                     job_script_prologue = [
                                            "module load python/3.12.1",
                                            "module load R/4.3.1",
                                            "source /g/data/mc99/methylationClassifier/bin/mcenv/bin/activate",
                                            "export LD_LIBRARY_PATH=/apps/python3/3.12.1/lib:/apps/R/4.3.1/lib64/R/library/methods/libs/:$LD_LIBRARY_PATH"
                                            ],
                     job_extra_directives=['-q normalbw',
                                           '-l ncpus=1',
                                           '-l mem=8GB', 
                                           '-l storage=gdata/mc99', 
                                           '-W umask=027', 
                                           '-l wd',
                                           '-m abe',
                                           '-l other=gdata',
                                          ],
                     job_directives_skip=["select"],
                     )
cluster.scale(jobs=3)
client = cluster.get_client()
client

This works fine and dandy. If I launch the dashboard, I can see three workers fire up in the workers section, and if I check the queue in the terminal, there’s the main job running the notebook and three worker jobs.

I have written a custom sklearn class to do feature reduction as part of a pipeline. I’m testing it on it’s own at the moment. As I understand it, I should be able to use joblib to submit jobs to the cluster that I’ve previously created, doing something like this:

with parallel_backend('dask', scheduler_host=client.scheduler_info()['address']):
    differentialMethylation.fit(filteredDataset, yValues)

I’m taking that from here

That code executes, but it doesn’t execute on any of the workers, and I can’t for the life of me see why. The code within the differentialMethylation class isn’t particularly dask aware as of yet (that’s a different question, that I’ll post once I’ve got this sorted). Even with the code inside it not being dask aware yet, my expectation would be that the task would be sent to at least one of the worker nodes.

It’s my understanding the when using parrallel_backend, you just have to make sure that the address you’re pointing the scheduler_host at has to be the one for the cluster I’ve previously created, thus the client=cluster.get_client() to make sure it’s the same.
I’ve also tried it with

with parallel_backend('dask', scheduler_host=cluster):
    differentialMethylation.fit(filteredDataset, yValues)

That didn’t cause throw any errors, but it also didn’t send any jobs to the cluster.

I should also add that when I close the cluster down, and dask-worker.e123456 and dask-worker.o123456 logs are generated, there’s nothing in the .o file. The .e file looks like it’s communicating basic “I’m alive” data between the scheduler, the worker and the nanny(?)

I’m sufficiently new with dask and parallelizing things that I may be missing some essential piece of diagnostic information here, happy to go looking if someone can point me in the right direction.

Cheers
Ben.

Edit
Adding the log from one of the workers:


Loading python3/3.12.1
  Loading requirement: intel-mkl/2023.2.0
2024-02-13 18:42:41,350 - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.6.121.2:33353'
2024-02-13 18:42:42,850 - distributed.worker - INFO -       Start worker at:     tcp://10.6.121.2:32799
2024-02-13 18:42:42,851 - distributed.worker - INFO -          Listening to:     tcp://10.6.121.2:32799
2024-02-13 18:42:42,851 - distributed.worker - INFO -           Worker name:               PBSCluster-1
2024-02-13 18:42:42,851 - distributed.worker - INFO -          dashboard at:           10.6.121.2:38883
2024-02-13 18:42:42,851 - distributed.worker - INFO - Waiting to connect to:     tcp://10.6.121.2:33939
2024-02-13 18:42:42,851 - distributed.worker - INFO - -------------------------------------------------
2024-02-13 18:42:42,851 - distributed.worker - INFO -               Threads:                          1
2024-02-13 18:42:42,851 - distributed.worker - INFO -                Memory:                   7.45 GiB
2024-02-13 18:42:42,851 - distributed.worker - INFO -       Local Directory: /jobfs/108100525.gadi-pbs/dask-scratch-space/worker-sps1c35x
2024-02-13 18:42:42,851 - distributed.worker - INFO - -------------------------------------------------
2024-02-13 18:42:44,015 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-02-13 18:42:44,016 - distributed.worker - INFO -         Registered to:     tcp://10.6.121.2:33939
2024-02-13 18:42:44,017 - distributed.worker - INFO - -------------------------------------------------
2024-02-13 18:42:44,018 - distributed.core - INFO - Starting established connection to tcp://10.6.121.2:33939
2024-02-13 18:47:02,926 - distributed._signals - INFO - Received signal SIGTERM (15)
2024-02-13 18:47:02,927 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.6.121.2:33353'. Reason: signal-15
2024-02-13 18:47:02,928 - distributed.nanny - INFO - Nanny asking worker to close. Reason: signal-15
2024-02-13 18:47:02,944 - distributed.nanny - INFO - Worker process 4006956 was killed by signal 15
2024-02-13 18:47:02,946 - distributed.dask_worker - INFO - End worker

Hi @tirohia, welcome to Dask Discourse forum!

I’m glad you found a way to use dask-jobqueue on your cluster, it seems there is a lot of specific quirks to add!

As per the sklearn problem, I would recommend to do some cross testing here. Maybe you should start with testing it on a LocalCluster first, this might make things simpler. I would also recommend to test a basic example like the GridSearch in Dask Example website.

From what I know, if you have a Client declared in your Python main process, you shouldn’t have to use scheduler_host kwarg for sklearn to use it in the context manager.

I’m not sure what the requirements are in order to benefit from sklearn joblib and thus Dask support, but I think this won’t come just like this. I think the model should implement something, sklearn won’t just submit the single training to Dask if it doesn’t support it.

By default all outputs are redirected to stderr, so this behaviour is normal.

Morena.

In the end it was indeed a little more complicated - as a beginner it wasn’t obvious to me that using sklearn and joblib would require more work.

I stripped everything back and made a very simple dask aware transformer on a LocalCluster, which worked - even figured out delayed tasks and compute. Still haven’t quite grokked scattering and futures. Managed to get a minimal working example of a custom sklearn transformer working though.

I’m probably about to abandon my attempts to get dask to work here though, I’m spending to much time on it. I’ve now found that dask and rpy2 don’t play well together. And while I could still use the sklearn pipeline without making the initial custom transformer dask aware (having that not execute on the localCluster, and the rest of the pipeline on the cluster), I’ve not been able to get either the sklearn or the dask_ml gridsearch to work with multiple options for any given parameter.

Those are different questions though so I’ll start a new thread.

1 Like