Hi,
I am trying to use dask-mpi to accelerate my data science workloads.
Here is my daskmpi-example.py
:
from dask_mpi import initialize
initialize(interface="ib0.8066", nthreads=1)
import dask.array as da
from dask.distributed import Client, performance_report
client = Client()
def example_function():
print(f"start example")
x = da.random.random((100_000, 100_000, 10), chunks=(10_000, 10_000, 5))
y = da.random.random((100_000, 100_000, 10), chunks=(10_000, 10_000, 5))
z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))
with performance_report(filename="dask-report_mpi.html"):
result = z.compute()
if __name__ == "__main__":
example_function()
I start the script with:
export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export OPENBLAS_NUM_THREADS=1
source activate /fs/fast/u20200002/envs/ds
. /opt/app/intel/2023/mpi/2021.9.0/env/vars.sh
mpirun -np 4 python daskmpi-example.py
What I got is:
2023-05-03 08:46:00,688 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2023-05-03 08:46:00,726 - distributed.scheduler - INFO - State start
2023-05-03 08:46:00,728 - distributed.scheduler - INFO - Scheduler at: tcp://192.168.1.75:43765
2023-05-03 08:46:00,729 - distributed.scheduler - INFO - dashboard at: http://192.168.1.75:8787/status
2023-05-03 08:46:00,774 - distributed.worker - INFO - Start worker at: tcp://192.168.1.75:38361
2023-05-03 08:46:00,774 - distributed.worker - INFO - Listening to: tcp://192.168.1.75:38361
2023-05-03 08:46:00,774 - distributed.worker - INFO - Worker name: 2
2023-05-03 08:46:00,774 - distributed.worker - INFO - dashboard at: 192.168.1.75:44295
2023-05-03 08:46:00,774 - distributed.worker - INFO - Waiting to connect to: tcp://192.168.1.75:43765
2023-05-03 08:46:00,774 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:00,774 - distributed.worker - INFO - Threads: 1
2023-05-03 08:46:00,774 - distributed.worker - INFO - Memory: 29.88 GiB
2023-05-03 08:46:00,774 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-qwhqeqw2
2023-05-03 08:46:00,774 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:00,787 - distributed.worker - INFO - Start worker at: tcp://192.168.1.75:33285
2023-05-03 08:46:00,787 - distributed.worker - INFO - Listening to: tcp://192.168.1.75:33285
2023-05-03 08:46:00,787 - distributed.worker - INFO - Worker name: 3
2023-05-03 08:46:00,787 - distributed.worker - INFO - dashboard at: 192.168.1.75:35371
2023-05-03 08:46:00,787 - distributed.worker - INFO - Waiting to connect to: tcp://192.168.1.75:43765
2023-05-03 08:46:00,787 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:00,787 - distributed.worker - INFO - Threads: 1
2023-05-03 08:46:00,788 - distributed.worker - INFO - Memory: 29.88 GiB
2023-05-03 08:46:00,788 - distributed.worker - INFO - Local Directory: /tmp/dask-worker-space/worker-cil0fsqr
2023-05-03 08:46:00,788 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:01,152 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://192.168.1.75:33285', name: 3, status: init, memory: 0, processing: 0>
2023-05-03 08:46:01,526 - distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.75:33285
2023-05-03 08:46:01,526 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:41296
2023-05-03 08:46:01,527 - distributed.worker - INFO - Registered to: tcp://192.168.1.75:43765
2023-05-03 08:46:01,527 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://192.168.1.75:38361', name: 2, status: init, memory: 0, processing: 0>
2023-05-03 08:46:01,527 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:01,528 - distributed.scheduler - INFO - Starting worker compute stream, tcp://192.168.1.75:38361
2023-05-03 08:46:01,528 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:41294
2023-05-03 08:46:01,528 - distributed.scheduler - INFO - Receive client connection: Client-e00f2527-e94b-11ed-b0b5-00001434fe80
2023-05-03 08:46:01,528 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:41298
2023-05-03 08:46:01,528 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:43765
2023-05-03 08:46:01,528 - distributed.worker - INFO - Registered to: tcp://192.168.1.75:43765
2023-05-03 08:46:01,529 - distributed.worker - INFO - -------------------------------------------------
2023-05-03 08:46:01,530 - distributed.core - INFO - Starting established connection to tcp://192.168.1.75:43765
2023-05-03 08:46:01,552 - distributed.worker - INFO - Run out-of-band function 'lambda'
2023-05-03 08:46:27,649 - distributed.core - INFO - Event loop was unresponsive in Worker for 17.01s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
2023-05-03 08:46:29,578 - distributed.core - INFO - Event loop was unresponsive in Worker for 19.06s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
It seems that dask started scheduler but it was waiting for something.
I can start a simple hello world mpi4py
python script and it works. And I check the source code of dask-mpi
, it use mpi4py
to start scheduler and workers.