Start dask-mpi problem

Hi,

I am trying to use dask-mpi to accelerate my data science workloads.
Here is my daskmpi-example.py:

from dask_mpi import initialize
initialize(interface="ib0.8066", nthreads=1)

import dask.array as da
from dask.distributed import Client, performance_report


client = Client()


def example_function():
    print(f"start example")
    x = da.random.random((100_000, 100_000, 10), chunks=(10_000, 10_000, 5))
    y = da.random.random((100_000, 100_000, 10), chunks=(10_000, 10_000, 5))
    z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))

    with performance_report(filename="dask-report_mpi.html"):
        result = z.compute()


if __name__ == "__main__":
    example_function()

I start the script with:

export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1

source activate /fs/fast/u20200002/envs/ds
. /opt/app/intel/2023/mpi/2021.9.0/env/vars.sh

mpirun -np 4 python daskmpi-example.py

What I got is:

2023-05-03 08:46:00,688 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2023-05-03 08:46:00,726 - distributed.scheduler - INFO - State start
2023-05-03 08:46:00,728 - distributed.scheduler - INFO -   Scheduler at:  tcp://192.168.1.75:43765
2023-05-03 08:46:00,729 - distributed.scheduler - INFO -   dashboard at:  http://192.168.1.75:8787/status
2023-05-03 08:46:00,774 - distributed.worker - INFO -       Start worker at:   tcp://192.168.1.75:38361
2023-05-03 08:46:00,774 - distributed.worker - INFO -          Listening to:   tcp://192.168.1.75:38361
2023-05-03 08:46:00,774 - distributed.worker - INFO -           Worker name:                          2
2023-05-03 08:46:00,774 - distributed.worker - INFO -          dashboard at:         192.168.1.75:44295
2023-05-03 08:46:00,774 - distributed.worker - INFO - Waiting to connect to:   tcp://192.168.1.75:43765
2023-05-03 08:46:00,774 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:00,774 - distributed.worker - INFO -               Threads:                          1
2023-05-03 08:46:00,774 - distributed.worker - INFO -                Memory:                  29.88 GiB
2023-05-03 08:46:00,774 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-qwhqeqw2
2023-05-03 08:46:00,774 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:00,787 - distributed.worker - INFO -       Start worker at:   tcp://192.168.1.75:33285
2023-05-03 08:46:00,787 - distributed.worker - INFO -          Listening to:   tcp://192.168.1.75:33285
2023-05-03 08:46:00,787 - distributed.worker - INFO -           Worker name:                          3
2023-05-03 08:46:00,787 - distributed.worker - INFO -          dashboard at:         192.168.1.75:35371
2023-05-03 08:46:00,787 - distributed.worker - INFO - Waiting to connect to:   tcp://192.168.1.75:43765
2023-05-03 08:46:00,787 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:00,787 - distributed.worker - INFO -               Threads:                          1
2023-05-03 08:46:00,788 - distributed.worker - INFO -                Memory:                  29.88 GiB
2023-05-03 08:46:00,788 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-cil0fsqr
2023-05-03 08:46:00,788 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:01,152 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://192.168.1.75:33285', name: 3, status: init, memory: 0, processing: 0>
2023-05-03 08:46:01,526 - distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.75:33285
2023-05-03 08:46:01,526 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:41296
2023-05-03 08:46:01,527 - distributed.worker - INFO -         Registered to:   tcp://192.168.1.75:43765
2023-05-03 08:46:01,527 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://192.168.1.75:38361', name: 2, status: init, memory: 0, processing: 0>
2023-05-03 08:46:01,527 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:01,528 - distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.75:38361
2023-05-03 08:46:01,528 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:41294
2023-05-03 08:46:01,528 - distributed.scheduler - INFO - Receive client connection: Client-e00f2527-e94b-11ed-b0b5-00001434fe80
2023-05-03 08:46:01,528 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:41298
2023-05-03 08:46:01,528 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:43765
2023-05-03 08:46:01,528 - distributed.worker - INFO -         Registered to:   tcp://192.168.1.75:43765
2023-05-03 08:46:01,529 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:01,530 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:43765
2023-05-03 08:46:01,552 - distributed.worker - INFO - Run out-of-band function 'lambda'
2023-05-03 08:46:27,649 - distributed.core - INFO - Event loop was unresponsive in Worker for 17.01s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-05-03 08:46:29,578 - distributed.core - INFO - Event loop was unresponsive in Worker for 19.06s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

It seems that dask started scheduler but it was waiting for something.

I can start a simple hello world mpi4py python script and it works. And I check the source code of dask-mpi, it use mpi4py to start scheduler and workers.

Hi @luweizheng, welcome to Dask community!

Looking at the logs you provided, it seems to me that the Dask Cluster is started properly. You’ve even some logs indicating that a computation is holding the GIL a bit too long.

You don’t get the result you’ve asked for? How does it behave if you launch a smaller computation?

This log also indicate that you’ve used Client.run() to submit a lambda function, is that so?

This log also indicate that you’ve used Client.run() to submit a lambda function, is that so?

Yes, I submit an example function and put the example python in this post. I do not see the print(f"start example") log in my output. Wired.

from dask_mpi import initialize
initialize(interface="ib0.8066", nthreads=1)

import dask.array as da
from dask.distributed import Client, performance_report


client = Client()


def example_function():
    print(f"start example")
    x = da.random.random((100_000, 100_000, 10), chunks=(10_000, 10_000, 5))
    y = da.random.random((100_000, 100_000, 10), chunks=(10_000, 10_000, 5))
    z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))

    with performance_report(filename="dask-report_mpi.html"):
        result = z.compute()


if __name__ == "__main__":
    example_function()

After I change the random array into a smaller size, the program now can go into the main() function. However, I get other problems. Maybe it’s not the problem of dask-mpi but the problem of dask-distributed?

2023-05-04 14:27:05,274 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 33373 instead
  warnings.warn(
2023-05-04 14:27:05,314 - distributed.scheduler - INFO - State start
2023-05-04 14:27:05,316 - distributed.scheduler - INFO -   Scheduler at:  tcp://192.168.1.79:43153
2023-05-04 14:27:05,316 - distributed.scheduler - INFO -   dashboard at:  http://192.168.1.79:33373/status
2023-05-04 14:27:05,356 - distributed.worker - INFO -       Start worker at:   tcp://192.168.1.79:43935
2023-05-04 14:27:05,357 - distributed.worker - INFO -          Listening to:   tcp://192.168.1.79:43935
2023-05-04 14:27:05,357 - distributed.worker - INFO -           Worker name:                          2
2023-05-04 14:27:05,357 - distributed.worker - INFO -          dashboard at:         192.168.1.79:41679
2023-05-04 14:27:05,357 - distributed.worker - INFO - Waiting to connect to:   tcp://192.168.1.79:43153
2023-05-04 14:27:05,357 - distributed.worker - INFO - -------------------------------------------------
2023-05-04 14:27:05,357 - distributed.worker - INFO -               Threads:                          1
2023-05-04 14:27:05,357 - distributed.worker - INFO -                Memory:                  29.88 GiB
2023-05-04 14:27:05,357 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-0cu9j4k1
2023-05-04 14:27:05,357 - distributed.worker - INFO - -------------------------------------------------
2023-05-04 14:27:05,357 - distributed.worker - INFO -       Start worker at:   tcp://192.168.1.79:44407
2023-05-04 14:27:05,357 - distributed.worker - INFO -          Listening to:   tcp://192.168.1.79:44407
2023-05-04 14:27:05,357 - distributed.worker - INFO -           Worker name:                          3
2023-05-04 14:27:05,357 - distributed.worker - INFO -          dashboard at:         192.168.1.79:42029
2023-05-04 14:27:05,357 - distributed.worker - INFO - Waiting to connect to:   tcp://192.168.1.79:43153
2023-05-04 14:27:05,357 - distributed.worker - INFO - -------------------------------------------------
2023-05-04 14:27:05,358 - distributed.worker - INFO -               Threads:                          1
2023-05-04 14:27:05,358 - distributed.worker - INFO -                Memory:                  29.88 GiB
2023-05-04 14:27:05,358 - distributed.worker - INFO -       Local Directory: /tmp/dask-worker-space/worker-plt04htm
2023-05-04 14:27:05,358 - distributed.worker - INFO - -------------------------------------------------
2023-05-04 14:27:05,754 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://192.168.1.79:43935', name: 2, status: init, memory: 0, processing: 0>
2023-05-04 14:27:06,155 - distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.79:43935
2023-05-04 14:27:06,155 - distributed.core - INFO - Starting established connection to tcp://192.168.1.79:57026
2023-05-04 14:27:06,156 - distributed.worker - INFO -         Registered to:   tcp://192.168.1.79:43153
2023-05-04 14:27:06,156 - distributed.worker - INFO - -------------------------------------------------
2023-05-04 14:27:06,156 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://192.168.1.79:44407', name: 3, status: init, memory: 0, processing: 0>
2023-05-04 14:27:06,157 - distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.79:44407
2023-05-04 14:27:06,157 - distributed.core - INFO - Starting established connection to tcp://192.168.1.79:57028
2023-05-04 14:27:06,157 - distributed.scheduler - INFO - Receive client connection: Client-b0516952-ea44-11ed-a5b0-00001434fe80
2023-05-04 14:27:06,158 - distributed.core - INFO - Starting established connection to tcp://192.168.1.79:43153
2023-05-04 14:27:06,158 - distributed.core - INFO - Starting established connection to tcp://192.168.1.79:57030
2023-05-04 14:27:06,158 - distributed.worker - INFO -         Registered to:   tcp://192.168.1.79:43153
2023-05-04 14:27:06,158 - distributed.worker - INFO - -------------------------------------------------
2023-05-04 14:27:06,160 - distributed.core - INFO - Starting established connection to tcp://192.168.1.79:43153
start main function
2023-05-04 14:27:06,177 - distributed.scheduler - INFO - Receive client connection: Client-b0b5c5d6-ea44-11ed-a5b0-00001434fe80
2023-05-04 14:27:06,178 - distributed.core - INFO - Starting established connection to tcp://192.168.1.79:57034
2023-05-04 14:27:06,180 - distributed.worker - INFO - Run out-of-band function 'stop'
2023-05-04 14:27:06,181 - distributed.scheduler - INFO - Scheduler closing...
2023-05-04 14:27:06,181 - distributed.scheduler - INFO - Scheduler closing all comms
2023-05-04 14:27:06,182 - distributed.worker - INFO - Stopping worker at tcp://192.168.1.79:43935. Reason: scheduler-close
2023-05-04 14:27:06,182 - distributed.core - INFO - Connection to tcp://192.168.1.79:57026 has been closed.
2023-05-04 14:27:06,183 - distributed.worker - INFO - Stopping worker at tcp://192.168.1.79:44407. Reason: scheduler-close
2023-05-04 14:27:06,183 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://192.168.1.79:43935', name: 2, status: running, memory: 0, processing: 0>
2023-05-04 14:27:06,183 - distributed.core - INFO - Removing comms to tcp://192.168.1.79:43935
2023-05-04 14:27:06,183 - distributed.core - INFO - Connection to tcp://192.168.1.79:57028 has been closed.
2023-05-04 14:27:06,183 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://192.168.1.79:44407', name: 3, status: running, memory: 0, processing: 0>
2023-05-04 14:27:06,183 - distributed.core - INFO - Removing comms to tcp://192.168.1.79:44407
2023-05-04 14:27:06,183 - distributed.scheduler - INFO - Lost all workers
2023-05-04 14:27:06,183 - distributed.core - INFO - Received 'close-stream' from tcp://192.168.1.79:43153; closing.
2023-05-04 14:27:06,184 - distributed.core - INFO - Received 'close-stream' from tcp://192.168.1.79:43153; closing.
2023-05-04 14:27:06,184 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://192.168.1.79:57026 remote=tcp://192.168.1.79:43153>
Traceback (most recent call last):
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
2023-05-04 14:27:06,184 - distributed.batched - INFO - Batched Comm Closed <TCP (closed) Worker->Scheduler local=tcp://192.168.1.79:57028 remote=tcp://192.168.1.79:43153>
Traceback (most recent call last):
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/comm/tcp.py", line 269, in write
    raise CommClosedError()
distributed.comm.core.CommClosedError
2023-05-04 14:27:06,379 - distributed.client - ERROR - 
ConnectionRefusedError: [Errno 111] Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/comm/core.py", line 292, in connect
    comm = await wait_for(
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/utils.py", line 1878, in wait_for
    return await asyncio.wait_for(fut, timeout)
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/comm/tcp.py", line 511, in connect
    convert_stream_closed_error(self, e)
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <distributed.comm.tcp.TCPConnector object at 0x149a765e5bd0>: ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/utils.py", line 754, in wrapper
    return await func(*args, **kwargs)
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/client.py", line 1323, in _reconnect
    await self._ensure_connected(timeout=timeout)
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/client.py", line 1353, in _ensure_connected
    comm = await connect(
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/site-packages/distributed/comm/core.py", line 316, in connect
    await asyncio.sleep(backoff)
  File "/fs/fast/u20200002/envs/ds/lib/python3.10/asyncio/tasks.py", line 605, in sleep
    return await future
asyncio.exceptions.CancelledError

There are still some inconsistencies:

I don’t see this print on your code snippet.

Also, I see two times:

Which is a bit weird.

Maybe we should try to debug step by step: what happens if you remove these two lines:

and

?

In order to see that, you should try without dask-mpi and just a LocalCluster.