When working with multiprocessing.Pool
, I can use the imap_unordered(fn, iterable)
to map the function fn
into each element of iterable
but without consuming the whole iterable upfront. Is this execution model available in dask?
I was trying to achieve the same with Client.submit
and as_completed
but it seems that as_completed
is consuming the whole iterable before starting any of the tasks.
The following example shows what I mean:
from tqdm import tqdm
from time import sleep
from dask.distributed import Client, as_completed
import multiprocessing as mp
def process(i):
sleep(0.01)
return i
if __name__ == "__main__":
client = Client()
total = 10_000
print("With dask")
tasks = tqdm((client.submit(process, i) for i in range(total)), desc='source', position=0, total=total, leave=True)
res = sum(tqdm((x[1] for x in as_completed(tasks, with_results=True)), desc='sink', position=1, total=total, leave=True))
print(res)
print("With multiprocessing")
with mp.Pool() as exec:
tasks = tqdm(range(total), desc='source', position=0, total=total, leave=True)
res = sum(tqdm(exec.imap_unordered(process, tasks), desc='sink', position=1, total=total, leave=True))
print(res)