DataFrame + distributed + secede

I have a Dask task that looks a bit like this:

# do some setup work
df = df.apply().persist()
return df

So the idea is that the task submits a large number of jobs via apply(), but doesn’t actually return until all those jobs are complete.

However this causes a deadlock, because it schedules some of the apply jobs on the same worker as the parent task. And the parent process is waiting for the child before it completes, but the child process can’t start because the worker is “processing” the parent.

So this seems like the exact scenario described here in the docs. So it’s evident to me that I need to secede() and rejoin() somewhere. However, it’s not clear to me where I should put these function calls, given that I’m using the higher-level DataFrame wrapper. Should I run secede() before or after persist()? What about rejoin()?

So from trial and error, it seems like you have to do this:

result = df.apply().persist()

Every other order seems to cause a deadlock for reason I can’t entirely explain.

1 Like

@multimeric Thanks for the question!

I’m not entirely sure about this, is your workflow actually launching tasks from tasks? I’m asking becuase it could be something else in the workflow, or it could also be a bug. Would you be able to share a reproducible example? I’d be happy to try it out and help confirm/debug.

Since you’re using secede+rejoin, I’d also suggest looking at these docs if you haven’t already. Especially the worker_client context manager: Launch Tasks from Tasks — Dask.distributed 2022.5.0 documentation

Okay, I can actually reproduce this locally. If you run this, the whole cluster gets into a deadlock and processes nothing.

import time
import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client, wait

def apply(row):
    print(f"Applying {row}")

def parent_task(df):
    print(f"Running parent task")
    fut = df.apply(apply, axis=1, meta=("foo", bool)).persist()

if __name__ == "__main__":
    client = Client(n_workers=1, threads_per_worker=1)
    df = pd.DataFrame({"a": range(100)}).pipe(dd.from_pandas, npartitions=20)
    fut = client.submit(parent_task, df)

You might say this is caused by only having 1 worker, and it is, but in my real life example which involves dask_jobqueue, I have several workers, but it assigns tasks to the same worker as the parent task, and doesn’t redistribute the tasks so this still happens. So any mechanism that stops tasks from being redistributed will cause this.

@multimeric Thank you for sharing an example. You’re right that this is a deadlock and we do need to use secede+rejoin as you’ve done. Your usage is also accurate. :smile:

@multimeric may I ask what causes the need to call wait() inside a task? This is something we’re planning to redesign and we would like to collect people’s use cases.

Also, unless there is something else holding a reference to the persisted dataset by the time your function returns, you will lose your data if for any reason at any moment the task output is spilled or moved to another worker. You should use publish_dataset to prevent it.

1 Like

Yeah so I was using wait() because I wanted to trigger the computation on a per-task basis. If I didn’t do this, the workflow would be a chain of lazy task definitions that only evaluate at the very end of the workflow. Now technically this is ideal, because it lets everything get optimised, but I’m using Prefect (which wraps around Dask), and if I don’t trigger the computation per parent task, I won’t get to checkpoint (save artifacts for) my workflow, and from a Prefect perspective it would mean that all of the first $n-1$ tasks take 1 second to run, and the last task takes several days, which is very weird to reason with.

In the end I removed the wait() but instead I replaced it by .to_parquet(), because it’s something I needed to do anyway as part of the checkpointing, because it triggers computation in the same way as wait(), but for some reason doesn’t cause a deadlock.

That said, I’m somewhat thinking of dropping Prefect and just using Dask directly, just because it doesn’t really offer much on top of Dask, except checkpointing, but that’s easy enough to implement as a utility function or something.

Let me know if you need any clarifications.