Hi
I have a process that look like this:
- Get list of items (json config)
- For each item, map them to embarrassingly parallel tasks but with a choke point
a. Sub-task 1,2,3: parallel
b. Sub-task 4: one at a time
c. Sub-task 5,6,7: parallel
I’m looking for a way to:
- Run locally, on a single system
- Limit total processes (no threads) to N
- Limit sub-task 4 to run just one at a time
- Keep most of the logic as independent as possible
Hi @Risotto6020, welcome to Dask community!
Could you provide some minimum reproducible example to try playing with it?
For the sub-task 4, when you say one at a time, you mean that there should be only one task running for every item at a time?
You could probably achieve this in some way or another using Delayed or Future, but it depends on your need, and a toy example would be better to understand.
Here is a toy example, but with 2 steps before and after instead of the 3 tasks in the parent post.
from dask import delayed
@delayed
def load(file):
"""Can run max 3 in parallel"""
return str(file)
@delayed
def pre_proc(data):
"""Can run max 3 in parallel"""
return [data, data]
@delayed
def compute(data):
"""Sadly, only one compute can run at a time"""
return " ".join(data)
@delayed
def post_proc(data):
"""Can run max 3 in parallel"""
return [data, data]
@delayed
def post(data):
"""Can run max 3 in parallel"""
print(data)
graphs = []
for x in ["a", "b", "1", "2"]:
data = load(x)
cleaned = pre_proc(data)
output = compute(cleaned)
ready = post_proc(output)
graphs.append(post(ready))
I want to run compute on all graphs (or add a dummy node on top to trigger a unified compute), and let dask handle the max concurrency limits for each Delayed task
Okay, one of the way I can see to implement this would be using Locks or Semaphores.
First, if for every steps other than compute have the same parallelism limit, then just create a cluster which has as many process/threads as you want to use at maximum. If not, you can use Semaphore.
For the compute part, just use a Lock
:
@delayed
def compute(data):
"""Sadly, only one compute can run at a time"""
with Lock('compute-lock'):
return " ".join(data)