I have called map_partitions() on a Dask series and for every partition I am returning either True or False. I want to stop running pending map_partition() tasks if a particular partition returns True. Please guide!
Hi @Gateway2745, welcome to this forum!
Did you put up something together, did you try?
I’ve played a little on your use case, and here is what I come up with:
import dask.dataframe as dd
from distributed import Client, as_completed
import time
# Start a client and distributed cluster
client = Client()
# Generate fake data
ddf = dd.from_dict(dict(a=list('abcdefghijklmnopqrstuvwxyz'), b=list(range(26))), npartitions=13)
def my_func(df):
bsum = df.b.sum()
time.sleep(bsum / 5)
if df.b.sum() > 26:
return True
else:
return False
delayed_partitions_calls = ddf.map_partitions(my_func).to_delayed()
# Start every delayed and get the corresponding futures
future_results = client.compute(delayed_partitions_calls)
# Loop on every partition result as soon as they arrive, and just break when True
for future in as_completed(future_results):
res = future.result()
if res:
break
# Cancel everything
client.cancel(future_results)
I think it does the job:
- Convert Dataframe partition to delayed
- Use Future API through Client: convert delayed to futures by submitting them
- Use as_completed to gather the results as soon as they arrive
Does it solve your problem?
1 Like
Hi @guillaumeeb . Thank you for your solution! Yes, I did find a solution very similar to yours. I created a function and wrapped it in dask.delayed()
. I then looped over the dataframe partitions and called this delayed function on each partition. This is a more lengthy process compared to your map_partitions()
followed by to_delayed()
. Thanks again!
1 Like