Available options for collective communication in Dask

In my case I did the following

model, = client.scatter([model],broadcast=False)
<persisted-dask-bag>.map(run_inference, model=model)

When I ran a snippet like this on a large cluster (several hundred nodes) and counted the number of times a particular node acted as the source for model Future* it turns out that only 10% of the nodes acted as sources. Further only 1 node acted as the source 78% of the time. The other nodes each accounted <1% of the transfers each.

More importantly I also observed that the same model was fetched several times on a worker :confused:, which makes me believe this is being discarded after each chunk is processed.

Are you experiencing poor performance or what is pointing you to a MPI style pattern?

There is a lot of delay (~1min) before the slowest worker starts computation using the model even for small assets (few hundred MB). The initial scatter call itself is quite fast (few seconds), which IIUC is the time to upload it to 1 worker. So I am assuming the large delay is due to sub-optimal transfer of model across the pool. I was hoping to more effectively transfer this asset.

The 1 minute delay isn’t much if this is single time operation, however for an iterative algorithms with a few hundred scatter type operations this quickly adds up. Right now these operations take >25% of the execution time in my case.

[*] This accounting was performed by tracking distributed.worker.get_data_from_worker calls, as implemented by a collaborator.


NOT A CONTRIBUTION