Workers do not keep data from `client.scatter(..., broadcast=True)`

When using client.scatter(..., broadcast=True), the data gets briefly scattered to all workers but then is immediately removed from all but one.

My expectation would be that all workers retain the data. Happy to report this as a bug with more information about my environment, but posting here first in case I’m misunderstanding how scatter operates.

MRE

from distributed import Client

client = Client(n_workers=3, threads_per_worker=1)
future = client.scatter('single_object', broadcast=True)
print(client.who_has())

For a brief moment, all workers have the data

> print(client.who_has())
{'str-e8fd9f45ede273e1cf488f4d4d842f5c': ('tcp://127.0.0.1:55450', 'tcp://127.0.0.1:55449', 'tcp://127.0.0.1:55451')}

but within a second, only 1 worker retains the data

> print(client.who_has())
{'str-e8fd9f45ede273e1cf488f4d4d842f5c': ('tcp://127.0.0.1:55449',)}

My understanding is that any task that depends on the scattered future will subsequently only get scheduled on that one worker that has the future’s data. However, I would like the subsequent tasks to still be distributed across all the workers.

Hi @Dahn, welcome to Dask Discourse forum,

What you see is due to the Active Memory Manager.

See this issue, the workaround for a real scatter is written in the documentation also:

Setting this flag to True is incompatible with the Active Memory Manager’s ReduceReplicas policy. If you wish to use it, you must first disable the policy or disable the AMM entirely.

1 Like