Client.scatter() producing uneven results

Hi, I’m running dask distributed on an existing ECS cluster, and I’m running into issues with client.scatter() not distributing the workload evenly. Versions are 2022.7.1 on the workers, 2022.8.0 on the scheduler, and 2022.6.0 on the client.

I have 3 workers running, and the client sees this (len(client.nthreads()) is 3). Yet when I run

data = ['a', 'b', 'c']
future = client.scatter(data)
client.who_has()

I get

client.rebalance() does nothing, and the only way I’ve found to get past this is to broadcast to all workers. But that’s causing issues with memory, since the real objects are quite large. Is there any way I can force scatter to send one item to each worker?

Disclaimer: I’m not a maintainer.

Do the worker nodes have multiple cores? From the documentation on data locality:

When a user scatters data from their local process to the distributed network this data is distributed in a round-robin fashion grouping by number of cores.

You can always manually specify the worker to scatter to with the worker parameter of scatter. Refer to the data locality documentation and the API docs).
If your workers only have a single core each, this would seem like a bug to me.

1 Like

Thought to ask a related question here.

I want all workers on a gateway-cluster to start out with the same numpy-array in memory, before I start a dask.compute(list_of_delayed_computations)-session.

Using client.scatter() distributes the array to one or a few workers, and then it has to transfer the data to other workers during the computation. But do you know if there is also a way to have all workers each have their own copy of the array, before the computation?

Instead of each task loading the data, which I guess will be slower.

I tried using broadcast=True, but still only one worker receives the data. I wonder if the reason has to do with the ReduceReplicas-policy mentioned in the API.

Best,