LocalCluster + MPI calls MPI_Init_Thread twice?

Hi all,

I am trying to launch a LocalCluster within an MPI application, so that the LocalCluster is used for local multiprocessing parallelism on each node, and communication between the ranks is handled with mpi4py between the nodes in the larger application. It seems like the launch of the LocalCluster involves calling the MPI_Init_thread() a second time, which fails. I don’t understand why Dask needs to call the MPI_Init_thread(), if is only used locally.

Here’s a sketch of what I’m trying to do:

import numpy as np
from dask.distributed import LocalCluster, Client, Worker, WorkerPlugin, get_worker
from mpi4py import MPI

class MyWorker(WorkerPlugin):
    def __init__(self, num_elements):

        self.num_elements = num_elements

    def setup(self, worker: Worker):
        """Set up some computationally intensive data on the local process"""
        self.private_data = np.random.random(self.num_elements)
        print("Initialized data")

    def calculate(client_sent_data):
        """Do some work on the client_sent_data + private, in-process data"""
        plugin = get_worker().plugins["myworker"]
        return client_sent_data, np.sum(plugin.private_data)

if __name__ == "__main__":
    rank = MPI.COMM_WORLD.Get_rank()

    cluster = LocalCluster(
        n_workers=2, threads_per_worker=1, processes=True
    )  # Workers need their own copy of private-data, not shared.
    client = Client(cluster, scheduler_file=f"~/dask_scheduler.{rank}.json")

    plugin = MyWorker(10)

    client.register_worker_plugin(plugin, name="myworker")

    print(f"Rank {rank} has client {client} and local plugins running")
    futures = client.map(MyWorker.calculate, [data for data in np.random.random(4)])
    [print(f.result()) for f in futures]


This works fine on a single rank:

$ python localcluster.py
Initialized data
Initialized data
Rank 0 has client <Client: 'tcp://' processes=2 threads=2, memory=187.41 GiB> and local plugins running
(0.44914885863268206, 4.842979322212476)
(0.31209960091496314, 6.342318680155895)
(0.37816662877881313, 4.842979322212476)
(0.9322436968566397, 6.342318680155895)

But crashes with some MPI_Init_thread() errors when run via mpirun:

$ mpirun -np 2 python localcluster.py
Error in system call pthread_mutex_destroy: Device or resource busy
Abort(1090575) on node 0 (rank 0 in comm 0): Fatal error in PMPI_Init_thread: Other MPI error, error stack:
MPIR_pmi_init(130)...: PMI_Get_appnum returned -1
... and so on .... 

Has anybody done something like this before? Am I missing something? Or is it just a bad idea to use Dask for local multiprocessing?

Any help would be appreciated,

– Jeremy

Hi @jeremyfirst22, welcome to Dask Discourse forum,

I’m really surprised of this, especially considering that you are not using dask-mpi. Dask shouldn’t call this kind of MPI primitives.

In this case, you don’t use mpirun at all?

I don’t think it is a bad idea, what I am thinking about is that sometimes using MPI doesn’t play well with spawning sub processes from inside a MPI rank. Are you really needing to use MPI for this? Do you need synchronization between each LocalCluster instance?

Hi @guillaumeeb thank you for the reply,

In this case, you don’t use mpirun at all?

Yes that’s right. It is worth noting that mpirun -np 1 python localcluster.py also dies, even with only one rank, which makes sense if the issue is with the subprocess spawning. I just don’t understand why that should be a problem. My MPI version is Intel Version 2021.4.

Are you really needing to use MPI for this?

There may be another way to do what I’m trying to do, but MPI is probably the easiest. The I/O library I use is already built on MPI, and the jobs are quite large (30-50 nodes per run). The MPI regime is embarrassingly parallel, save for a halo exchange, but I have had a very difficult time using Dask arrays vs NumPy + MPI.

Do you need synchronization between each LocalCluster instance?

The LocalCluster instances are completely independent, no synchronization or communication required between them.

Let me know if that helps shed any light, and thank you again for your help.

– Jeremy

Did you tried to use processes=False for the LocalCluster creation?
Also, why are you giving a scheduler_file argument when creating the Client?

My thought was that processes=True will cause each WorkerPlugin to be a separate block in memory, but that may be true for processes=False as well. The WorkerPlugin “private data” is actually an instance of a 3rd party program interface, and the goal is to run a separate instance of the program on each CPU/WorkerPlugin.

I am setting the scheduler_file argument, since I will have multiple instances of the LocalCluster running simultaneously (one per each MPI rank). They will each need their own place to right scheduling information, correct?

– Jeremy

I should add that the 3rd party program interfaces is loaded from a ctypes.cdll.LoadLibrary call, so if I do this with processes=False, all the WorkerPlugins end up pointing to the same DLL. My understanding is that they must be in separate processes to point to separate DLLs, which is why I need this to work with processes=True

Well, to be perfectly honest, I’m looking for two things:

  • Just disable the use of the process, to see if it helps in your setup (even if it is not compatible with your workflow)
  • a way to by pass the use of Nanny process that is use to spawn Workers, I know that when using dask-mpi this can help (e.g. setting the --no-nanny CLI flag). But I’m not perfectly sure of how to achieve that using the LocalCluster Python API. Maybe by setting the worker class name to Worker and keeping processes=True kwarg.

But you’re right, you’ll need to have independent process at one point if you need separate block of memory or just separate processes for any reason.

Well, LocalCluster does not need to use a scheduler_file if you are not planning to connect other Workers to it. It should be safe to drop this line.