Operations on a partitioned DataFrame not actually distributed across workers

I have a Dask DataFrame with 50 partitions, and 4 workers in my distributed Dask cluster (using dask-jobqueue). However, whenever I run a long running task (e.g. an apply which takes a few hours on a single process), the entire task is run on a single worker node instead of being distributed. Why is this happening? I wonder if, because the DataFrame can physically fit into memory on one worker, it isn’t bothering to distribute it? But I want to process it with multiple processes, which is the whole reason I’m using Dask. Can I force it to distribute somehow?

I can tell that only 1 (or 2, in this case) workers are being used from the dashboard:
image

A loose sketch of how this is setup is:

cluster = SLURMCluster(n_workers=4, ...)
df = dask.dataframe.from_pandas(..., npartitions=50)
df.apply(...).persist().compute()

Here’s a more horrifying example:
image

8 workers, and 100 tasks are on one of the workers, and 0 tasks on almost all the others.

1 Like

@multimeric Thanks for your question! Have you resolved this already?

If not, could you please share:

  • How big is your pandas DataFrame (because it will affect the actual number of partitions created)?
  • Are your partitions balanced?
  • Is the task graph for your compute: df.apply(...).visualize() showing what you expect?

It’ll also be helpful if you can share a minimal, reproducible example, perhaps with LocalCluster. :smile:

Of course it would be great if I had a reproducible example, but that’s very tricky to do, in between the complexity of the code, and the specific cluster conditions where this happens.

I guess my data frame is in the order of 10s or 100s of megabytes (so not huge). It’s probably not balanced, because I’ve partitioned using npartitions instead of using a fixed size. Should I be balancing my partitions?

It’s probably not balanced, because I’ve partitioned using npartitions instead of using a fixed size. Should I be balancing my partitions?

It shouldn’t be a big problem, I’d just try to make sure you don’t have empty partitions.

Based on your other question about deadlocks, I’m wondering if this is related to work stealing seems to not occur fully when launching tasks from tasks · Issue #4945 · dask/distributed · GitHub