Specifying resources for client.map causes no task submission

Hey, there. I am new to Dask, but successfully set up dask/dask on a kubernetes cluster with helm.

I have been running into an issue where workers die when I apply client.map(f, files). The files are large (about 1gb) and the function f loads and pushes to some zarr store.

I watch the dashboard and notice that when len(files) grows, the number of tasks per worker increases, memory approaches the limit specified in values.yaml resources for helm install, the worker dies and things fail out.

So, I thought it might be a good idea to specify resources for a task with something like:

futures = client.map(f, files, resources={'MEMORY': 2e9})

but the tasks never submits and the dashboard recognizes no new tasks. Am I missing something?

How does dask know to stop submitting tasks to workers if it will cause a worker to use more memory than allows? Is this a nuance of client.map? Should I use dask.delayed or client.submit instead?

I can avoid this by running submit and get over batches to make sure no worker gets more than k tasks, but I feel like I am inventing the wheel here…

Hi @ljstrnadiii, welcome! Do you have any screenshots of the dashboard while you run out of memory? I’m wondering if there is a leak somewhere which is causing memory to accumulate rather than being freed. If you run a single task with client.submit to you see the memory spike and then go back down after the task is done? The distributed scheduler should be able to identify the amount of memory that each task takes and take that into account when scheduling them, but it seems that isn’t happening properly here.

One point of clarification: specifying worker resources as you have done isn’t actually about configuring when to hold off on scheduling. Instead, it allows you to annotate specific workers that have specific resources (like a GPU or a high-memory worker), and then you can specify those resources in submit/map/compute, which Dask will take into account when deciding which worker to dispatch to. So unless you have started a worker with a "MEMORY" resource, I wouldn’t expect that to do anything (though it should still submit!).

1 Like

The underlying opening function is rioxarray.open_rasterio, which is taking a path to gcs. I bet the underlying utility opens bytes, uses a memoryfile, opens it and sends it to rioxarray. This always seems to leave some memory behind. There is also memory left behind in the bytes stored per worker even after I cancel all futures–it is on the order of Gbs of unmanaged memory usage.

Thanks for the point of clarification.

I also found that when I specified a chunk, I would often get a comm error. Maybe this is due to task overload form too many underlying dask array tasks when writing up to a zarr store?

Some pseudo code for what I am trying to accomplish:

import rioxarray

def read_write_tiff(f, store):
    with rioxarray.open_rasterio(f) as dset:
        # configure lat and lon
        slice_dict = get_sliced_dict(dset)
        
        # write the dang thing
        dset.to_zarr(
            store=store,
            region=slice_dict,
            synchronizer=self.lock,
            safe_chunks=False,
        )
        
futures = client.map(read_write_tiff, paths)
results = client.gather(futures)

Each path is a file on gcs that is about 100-500Mbs. I can successfully get <30gbs to write correctly, but things seem to struggle when I push to 50Gb. Ideally I can write out 100s of Gbs. I have seen comm errors, worker died errors, very long hangs, etc.

Any rules of thumb for using dask like this? Any thoughts would be greatly appreciated. Cheers!

Maybe for large reads ops like this, we could use something like:

with dask.config.set(scheduler='processes'):
    data = large_read(f)

Perhaps that would release the memory more effectively?

If there is significant unmanaged memory after a task is done, there are a few things you can try.

  1. Try the "processes" scheduler, as you suggest. That has more overhead per-task, but task overhead sounds like a rounding error in your case. I’m curious to see if that helps!
  2. Try to manually trim unmanaged memory every once in a while with something like this. It could even be run on a loop if you are constantly generating unmanaged memory.
  3. If you want to get really fancy, you might try some of the ideas in this video:

Thanks for the thoughts!

  1. The "processes" scheduler did not do anything.
  2. Manually trimming unmanaged memory did not do anything.
  3. I might get fancy one day lol. Thanks for sharing that.

Turns out that rioxarray.open_raster() takes a cache argument, which defaults to True. Setting this to False prevents all this unmanaged memory. Since I only read the underlying array data once, there is no need for it to be set to True.

Regardless, good stuff to know regarding all the points above!

1 Like

Can you point to any light reading on how dask decides to schedule tasks?

Considering the case where we map a list of files to a read function, if the read function takes advantage of all available cores, how does dask decide to share these resources between multiple tasks. It begs the question: ‘is it better to have many tasks using fewer cores or fewer tasks using more cores per worker?’

This is probably dependent on the code behind the read function, for example, but how does dask make decisions around this type of thing? I have caught a glimpse of this notion of priority. Is that related?

My guess is that dask will default to giving a task the resources it takes and schedule accordingly.

Dask uses a pretty complex set of heuristics and state machines to schedule tasks, and I’m far from the expert on those. But I can point you towards the Distributed docs, and especially the section on scheduling policies for more reading on that.

I think you’ll probably a bad time if the read function uses multiprocessing while Dask is trying to do the same. I’m not aware of a good way to mix/match concurrency frameworks like that, and Dask mostly assumes tasks are synchronous and can be safely executed without contending with other tasks. If there is some shared resource pool that the tasks need to work around, you might look into Locks.

As far as “many tasks” vs “few tasks”, it will certainly depend on the function in question, and I’m not sure there are any general rules there. For instance the answer may depend on whether is your function compute-bound or network-bound.

1 Like