Can not run dask on cluster with LSF

I am a beginner for dask. I am trying to test dask for data computing on supercomputer with lsf scheduler.

The cores on each node are 40, with available mem for about 160GB.

What I have tried:

cluster = LSFCluster(queue='short',
                     cores=40,
                     interface='ib0', 
                     memory='300 GB',
                     processes=80,
                     ncpus=80,
                    job_directives_skip=['span[hosts=1]','-W','-M'],
                    shebang="#!/bin/bash",
                    job_extra_directives=['-R "span[ptile=40]"'],
                     )
cluster.scale(1)
client = Client(cluster)

import time 
from dask.distributed import progress 

def slow_increment(x):
    time.sleep(1)
    return x+1 

futures = client.map(slow_increment, range(5000))
progress(futures)

The job script (The head is the same as what administrator recommends):

#!/bin/bash

#BSUB -J dask-worker
#BSUB -q short
#BSUB -n 80
#BSUB -R "span[ptile=40]"
#BSUB -o %J.out
#BSUB -e %J.err

/work/myusername/softwares/python/anaconda3/2022.10/envs/test_multinode/bin/python -m distributed.cli.dask_worker tcp://10.2.x.xx:41754 --nthreads 0 --nworkers 80 --memory-limit 3.4GiB --name dummy-name --nanny --death-timeout 60 --interface ib0

I hope this job will use two node, each with 40 cores and 150GB mem.

However, when I am checking on each node with top command, one of the nodes has many tasks, the other one has zero tasks.

So my questions are:

  1. Am I setting correctlly? The reason why I set the parameters of LSFCluster in this way is to ensure that the job_script is exactly the same as the example given by the administrator.
  2. Why the second node not working?
  3. I wonder if I am understanding it right: In the documentation, the term “job” refers to each task that we submit to the cluster, and there is no data exchange possible between these tasks.

Thank you in advance!

Hi @xuJ14, welcome to Dask community!

dask-jobqueue does not handle multiple nodes job, it can only start work on a single node, even if LSF propose more advanced features.

I’m not sure what restrictions there are on this supercomputer, but surely you can submit job on single nodes?

This should look something like:

cluster = LSFCluster(queue='short',
                     cores=40,
                     interface='ib0', 
                     memory='150 GB',
                     processes=40,
                     shebang="#!/bin/bash",
                     )
cluster.scale(2)

I can submit job on single node.

And after I run your code on this cluster, it says:

2024-01-15 16:00:45,569 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x2ac9285e5340>>, <Task finished name='Task-25228' coro=<SpecCluster._correct_state_internal() done, defined at /work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/distributed/deploy/spec.py:346> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 255\nCommand:\nbsub< /tmp/tmprt4h4w0g.sh 2> /dev/null\nstdout:\n\nstderr:\n\n')>)
Traceback (most recent call last):
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/tornado/ioloop.py", line 738, in _run_callback
    ret = callback()
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/tornado/ioloop.py", line 762, in _discard_future_result
    future.result()
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/distributed/deploy/spec.py", line 390, in _correct_state_internal
    await asyncio.gather(*worker_futs)
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/asyncio/tasks.py", line 688, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/distributed/deploy/spec.py", line 74, in _
    await self.start()
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/dask_jobqueue/core.py", line 411, in start
    out = await self._submit_job(fn)
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/dask_jobqueue/lsf.py", line 116, in _submit_job
    return self._call(piped_cmd, shell=True)
  File "/work/my_account/softwares/python/anaconda3/2022.10/envs/test_multinode/lib/python3.9/site-packages/dask_jobqueue/core.py", line 489, in _call
    raise RuntimeError(
RuntimeError: Command exited with non-zero exit code.
Exit code: 255
Command:
bsub< /tmp/tmprt4h4w0g.sh 2> /dev/null
stdout:

stderr:

When I change the code to:

cluster = LSFCluster(queue='debug',
                     cores=40,
                     interface='ib0', 
                     memory='150 GB',
                     processes=40,
                     shebang="#!/bin/bash",
                     job_directives_skip=[ '-W'],
                     )
cluster.scale(2)

There’s no error message, however I checked “bjobs -a” and was told:

JOBID      STAT  QUEUE      FROM_HOST   EXEC_HOST   JOB_NAME   SUBMIT_TIME
6287768    RUN   debug      login01     40*r01n01   *sk-worker Jan 15 16:23

Which means only one exec_host and one job was launched. But I think cluster.scale(2) should launch two right?

I am the only user of this cluster in this queue. and there’s definitely at least two nodes.

How should I fix this? Thank you for your reply!

By default, scale() is taking as an argument the number of worker processes. In your case, you have 40 proesses by job, so a single job is launched.

cluster.scale(jobs=2)

should do the trick.