Specifying worker resources per task in the custom graph interface?

Hi, I see that client.get allows me to specify resources for a custom graph execution. However, as far as I can tell, this applies the resources to all tasks in the custom graph. Is there a way to specify the desired resources per task?

Something like this:

client.get(
    {
        'taskA': (...),
        'taskB': (...),
        'taskC': (...),
        'taskD': (...),
    },
    resources={
        'taskA': {'MEMORY': 5000, 'CPU': 2},
        'taskD': {'CPU': 2}
    }
)

Hi @dask-user, welcome to this forum!

I don’t think there is a way to specify resources when working directly with graphs.

But is there a reason why you are working directly with this API? This is a really low code API that should be used only when you really have to. Couldn’t you use Delayed API to build your tasks graph?

This way, you would be able to do something like:

import dask

output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    with dask.annotate(resources={'GPU': 1}):
        c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)

So specifying resources only for specific layers of the graph.

Hi, yes the reason is that the graph is actually being built outside of the dask ecosystem. Dask is being used just for the execution. However, I think based on what you provided here, what I could do is take the graph spec and run it through a process that translates it to a delayed object, applying the appropriate annotations where desired.

So it seems that the ability to apply different resources to different parts of the graph does exist, it just isn’t exposed in the custom graph api (for the record I can specify resources in the custom graph api, it’s just that I can only specify a global resource spec, not task by task), is that an accurate assessment? and if so would there be any appetite to expose that functionality directly to the custom graph api?

Thanks.

Yes, you can only specify resources for different part of the graph through HighLevelGraph object. There is a way to convert from graph to this object, but I’m not sure how easily you can modify the object to put custom annotations then. See Annotations for custom graphs in dask - Stack Overflow.

There is still an open issue about that: Support Task Annotations within the graph · Issue #3783 · dask/dask · GitHub. There was some performance concerns, so currently only Layer based annotations are supported. See also Layer Annotations · Issue #6701 · dask/dask · GitHub.

Thanks for the background and info. I see now that I can build the HighLevelGraph directly rather than translating through the delayed interface. However, I’m realizing that to properly annotate task level worker resources through the HLG, I’ll have to do one of two things.

(1) create a HLG with a layer for each individual task. This will make applying annotations easy, as I can apply them independently to the layer associated with the task that needs the specific resources.

(2) create layers where I group nodes together that require the same resources. This is much harder since it will not be always be possible to put these tasks in the same layer, as they might have conflicting up/downstream layer level dependencies. This will require an algorithm to break up the layers into smaller layers (with the same annotations) to avoid these conflicting layer dependencies, and I’m pretty sure getting an “optimal” grouping of layers will be an NP-hard problem. I’m sure there’s some other heuristics I can use to achieve something good enough and better than 1 layer per node though.

Ideally I’d go with the easier option 1. However, my question is, is it very inefficient to create an HLG with a layer for each node? For some context I will likely have anywhere from 1k - 200k nodes in the graph.

Edit: regarding a heuristic for avoiding dependency conflicts in layers. I think the simplest approach would be the following, which would make option 2 a viable option.

  1. create a first set of layers using the groups provided by networkx.topological_generations.
  2. subdivide the layers from above based on the same worker resource requests.
  3. any adjacent layers in the topological order with identical resource requests can be merged into one layer
    Starting with the groups provided by nx.topo_generations will guarantee that no layer dependency conflicts can arise. It’s not perfect, but it reduces the number of layers required in my use case (and probably most use cases) by several orders of magnitude.

I think this is a very bad idea yes. Having 200k tasks already starts to be a bit hard to handle for the Scheduler. So I guess having 200k layers would be much worse!

Anyway, here we are discussing things far above my level of knowledge on graphs and scheduling.

Let’s see if @rjzamora or @fjetter have some time to look at this.

Sorry for the incredibly late reply.

In case this is still relevant, the HLG/Layer interface also accepts a callable that accepts one argument (the key) as annotations. This way you can likely set all the custom annotations with one layer.

FWIW 200k tasks are fine for the scheduler. 200k layers are a little less efficient but should also be doable. This should just be a little more overhead when submitting the graph

1 Like

@fjetter - looks like this gives close to exactly what I want from the top level client.get api. The working solution looks very close to my desired api in the main post:

client.get(
    {
        'taskA': (...),
        'taskB': (...),
        'taskC': (...),
        'taskD': (...),
    },
    resources=lambda task: {
        'taskA': {'MEMORY': 5000, 'CPU': 2},
        'taskD': {'CPU': 2}
    }.get(task)
)

Thank you! I would highly recommend mentioning this in the custom graph docs, as this is a very valuable, but unadvertised feature.