Hi all,
I am trying to launch a LocalCluster within an MPI application, so that the LocalCluster is used for local multiprocessing parallelism on each node, and communication between the ranks is handled with mpi4py between the nodes in the larger application. It seems like the launch of the LocalCluster involves calling the MPI_Init_thread() a second time, which fails. I don’t understand why Dask needs to call the MPI_Init_thread(), if is only used locally.
Here’s a sketch of what I’m trying to do:
import numpy as np
from dask.distributed import LocalCluster, Client, Worker, WorkerPlugin, get_worker
from mpi4py import MPI
class MyWorker(WorkerPlugin):
def __init__(self, num_elements):
self.num_elements = num_elements
def setup(self, worker: Worker):
"""Set up some computationally intensive data on the local process"""
self.private_data = np.random.random(self.num_elements)
print("Initialized data")
@staticmethod
def calculate(client_sent_data):
"""Do some work on the client_sent_data + private, in-process data"""
plugin = get_worker().plugins["myworker"]
return client_sent_data, np.sum(plugin.private_data)
if __name__ == "__main__":
rank = MPI.COMM_WORLD.Get_rank()
cluster = LocalCluster(
n_workers=2, threads_per_worker=1, processes=True
) # Workers need their own copy of private-data, not shared.
client = Client(cluster, scheduler_file=f"~/dask_scheduler.{rank}.json")
plugin = MyWorker(10)
client.register_worker_plugin(plugin, name="myworker")
print(f"Rank {rank} has client {client} and local plugins running")
futures = client.map(MyWorker.calculate, [data for data in np.random.random(4)])
[print(f.result()) for f in futures]
client.close()
cluster.close()
This works fine on a single rank:
$ python localcluster.py
Initialized data
Initialized data
Rank 0 has client <Client: 'tcp://127.0.0.1:39066' processes=2 threads=2, memory=187.41 GiB> and local plugins running
(0.44914885863268206, 4.842979322212476)
(0.31209960091496314, 6.342318680155895)
(0.37816662877881313, 4.842979322212476)
(0.9322436968566397, 6.342318680155895)
But crashes with some MPI_Init_thread()
errors when run via mpirun:
$ mpirun -np 2 python localcluster.py
...
...
...
Error in system call pthread_mutex_destroy: Device or resource busy
../../src/mpi/init/init_thread_cs.c:66
Abort(1090575) on node 0 (rank 0 in comm 0): Fatal error in PMPI_Init_thread: Other MPI error, error stack:
MPIR_Init_thread(143):
MPID_Init(1221)......:
MPIR_pmi_init(130)...: PMI_Get_appnum returned -1
...
... and so on ....
....
Has anybody done something like this before? Am I missing something? Or is it just a bad idea to use Dask for local multiprocessing?
Any help would be appreciated,
– Jeremy