How do I get workers to retain a large object needed over several task graphs?

I have the following process where batches are processed, reusing the same large (~1GB) object for each batch:

large_object_delayed = dask.delayed(
    large_object, pure=True
).persist()
for batch in list_of_bacthes:
    batch_bag = db.from_sequence(batch, npartitions=len(batch))
        results = db.map(
            func,
            batch_bag,
            large_object=large_object_delayed,
        ).compute()

My task stream ends up looking like this:

The problem here is that the workers are being sent large_object again and again for each batch. Which is wasting a lot of time. I know that combining the batches into one compute call would solve this, but I can’t do that as I need to save results in batches so that the process can be resumed if interrupted.
I’ve tried fiddling with worker memory thresholds to no avail. How can I get the workers to hang on to the large object?
Many thanks!

Hi @benjeffery, welcome to Dask community!

Did you try if using Client.scatter function changed anything? With broadcast=True?

I did attempt scatter for a similar purpose and I noticed that

  1. Same object seems to be transferred several times to a worker for a single compute call.
  2. The object definitely is re-transferred across compute calls.
    Available options for collective communication in Dask - #9 by vij

I am curious if there is a mechanism to specify during the scatter that the object needs to be preserved in the a local cache for future use and when the map calls are made the worker checks for the required key in local-cache before fetching it from other workers.

Thanks for your replies. I’ve tried scatter but the resulting behaviour didn’t change. I’ve just found Active Memory Manager — Dask.distributed 2023.8.0 documentation which suggests making a memory manager policy - I’ll try that out and report back.

Just adding client.amm.stop() fixed this in the end! It seems that the default AMM is quite agressive - clearing out everything to a single copy every two seconds.

1 Like

Hi,

You are right, the AMM at the moment will go in the way of your workflow and the only way to prevent it is to stop it.

The functionality you’re looking for is Reimplement `replicate()` using the Active Memory Manager · Issue #6578 · dask/distributed · GitHub, which at the moment is shelved indefinitely. Thanks for raising a use case for it.

  1. The object definitely is re-transferred across compute calls.

Yes, this is correct. To prevent that you need to either disable AMM or #6578.

  1. Same object seems to be transferred several times to a worker for a single compute call.

This should not happen. The AMM will preserve a number of replicas equal to the number of unassigned waiter tasks, plus the number of workers where waiter tasks have already been assigned to: https://github.com/dask/distributed/blob/acb28095034dcbf546531d7eee7bd39217e19b24/distributed/active_memory_manager.py#L533-L541

Do you have a reproducer for what you’re observing?

I am curious if there is a mechanism to specify during the scatter that the object needs to be preserved

scatter(broadcast=True) and replicate are incompatible with AMM at the moment. The tracking issue is #6578.