Cancel map_partitions() based on condition

I have called map_partitions() on a Dask series and for every partition I am returning either True or False. I want to stop running pending map_partition() tasks if a particular partition returns True. Please guide!

Hi @Gateway2745, welcome to this forum!

Did you put up something together, did you try?

I’ve played a little on your use case, and here is what I come up with:

import dask.dataframe as dd
from distributed import Client, as_completed
import time

# Start a client and distributed cluster
client = Client()

# Generate fake data
ddf = dd.from_dict(dict(a=list('abcdefghijklmnopqrstuvwxyz'), b=list(range(26))), npartitions=13)

def my_func(df):
    bsum = df.b.sum()
    time.sleep(bsum / 5)
    if df.b.sum() > 26:
        return True
    else:
        return False

delayed_partitions_calls = ddf.map_partitions(my_func).to_delayed()

# Start every delayed and get the corresponding futures
future_results = client.compute(delayed_partitions_calls)

# Loop on every partition result as soon as they arrive, and just break when True
for future in as_completed(future_results):
   res = future.result()
   if res:
       break

# Cancel everything
client.cancel(future_results)

I think it does the job:

  1. Convert Dataframe partition to delayed
  2. Use Future API through Client: convert delayed to futures by submitting them
  3. Use as_completed to gather the results as soon as they arrive

Does it solve your problem?

1 Like

Hi @guillaumeeb . Thank you for your solution! Yes, I did find a solution very similar to yours. I created a function and wrapped it in dask.delayed(). I then looped over the dataframe partitions and called this delayed function on each partition. This is a more lengthy process compared to your map_partitions() followed by to_delayed(). Thanks again!

1 Like