Restrict parallelism in different parts of compute graph

Hi :wave:

I have a process that look like this:

  1. Get list of items (json config)
  2. For each item, map them to embarrassingly parallel tasks but with a choke point
    a. Sub-task 1,2,3: parallel
    b. Sub-task 4: one at a time
    c. Sub-task 5,6,7: parallel

I’m looking for a way to:

  • Run locally, on a single system
  • Limit total processes (no threads) to N
  • Limit sub-task 4 to run just one at a time
  • Keep most of the logic as independent as possible

Hi @Risotto6020, welcome to Dask community!

Could you provide some minimum reproducible example to try playing with it?

For the sub-task 4, when you say one at a time, you mean that there should be only one task running for every item at a time?

You could probably achieve this in some way or another using Delayed or Future, but it depends on your need, and a toy example would be better to understand.

Here is a toy example, but with 2 steps before and after instead of the 3 tasks in the parent post.

from dask import delayed

@delayed
def load(file):
    """Can run max 3 in parallel"""
    return str(file)

@delayed
def pre_proc(data):
    """Can run max 3 in parallel"""
    return [data, data]

@delayed
def compute(data):
    """Sadly, only one compute can run at a time"""
    return " ".join(data)

@delayed
def post_proc(data):
    """Can run max 3 in parallel"""
    return [data, data]

@delayed
def post(data):
    """Can run max 3 in parallel"""
    print(data)

graphs = []
for x in ["a", "b", "1", "2"]:
    data = load(x)
    cleaned = pre_proc(data)
    output = compute(cleaned)
    ready = post_proc(output)
    graphs.append(post(ready))

I want to run compute on all graphs (or add a dummy node on top to trigger a unified compute), and let dask handle the max concurrency limits for each Delayed task

Okay, one of the way I can see to implement this would be using Locks or Semaphores.

First, if for every steps other than compute have the same parallelism limit, then just create a cluster which has as many process/threads as you want to use at maximum. If not, you can use Semaphore.

For the compute part, just use a Lock:

@delayed
def compute(data):
    """Sadly, only one compute can run at a time"""
    with Lock('compute-lock'):
        return " ".join(data)