I have called map_partitions() on a Dask series and for every partition I am returning either True or False. I want to stop running pending map_partition() tasks if a particular partition returns True. Please guide!
Hi @Gateway2745, welcome to this forum!
Did you put up something together, did you try?
I’ve played a little on your use case, and here is what I come up with:
import dask.dataframe as dd from distributed import Client, as_completed import time # Start a client and distributed cluster client = Client() # Generate fake data ddf = dd.from_dict(dict(a=list('abcdefghijklmnopqrstuvwxyz'), b=list(range(26))), npartitions=13) def my_func(df): bsum = df.b.sum() time.sleep(bsum / 5) if df.b.sum() > 26: return True else: return False delayed_partitions_calls = ddf.map_partitions(my_func).to_delayed() # Start every delayed and get the corresponding futures future_results = client.compute(delayed_partitions_calls) # Loop on every partition result as soon as they arrive, and just break when True for future in as_completed(future_results): res = future.result() if res: break # Cancel everything client.cancel(future_results)
I think it does the job:
- Convert Dataframe partition to delayed
- Use Future API through Client: convert delayed to futures by submitting them
- Use as_completed to gather the results as soon as they arrive
Does it solve your problem?