Diagnostics for `DataFrame.compute()`

Assuming we have a dask.dataframe.DataFrame, if we run .compute() we never gain access to the futures objects, meaning that we can’t use distributed.diagnostics.progressbar.progress. Also, distributed.get_task_stream seems to only provide access to task metadata after .compute() has run.

How then, can we get a progress bar or other diagnostics for tasks that are run using .compute()?

Note that this StackOverflow question poses many of the same questions: python - Dask diagnostics - progress bar with map_partition / delayed - Stack Overflow

Oh, and while I think it is possible to use:

ddf = ddf.map_partitions(..)
fut = client.compute(ddf)
progress(fut)

This seems like an issue, because then we aren’t repacking the collections into a pandas.DataFrame, which is not very user-friendly.

Hi @multimeric and welcome to discourse!

This is a great question, there are actually a few different ways to see a progress bar in Dask:

  1. The dask.distributed progress bar is intended to be used with Futures (as you noted).
  2. The dask.diagnostics progress bar can be used with the local task scheduler and provides feedback during computation:
import dask.dataframe as dd
from dask.datasets import timeseries
from dask.diagnostics import ProgressBar

ddf = timeseries()
res = ddf.reset_index().groupby('name')['x'].mean()
with ProgressBar():
    res.compute()
# out: [########################################] | 100% Completed |  0.3s
  1. The progress bar in the Dask dashboard, provides live feedback when you use the Dask distributed scheduler. Improved documentation on the dashboard will be available in the next Dask release, but for now you can also check out this post for more examples of how to use the dashboard.
import dask.dataframe as dd
from dask.datasets import timeseries
from dask.distributed import Client, LocalCluster

# use the distributed scheduler
cluster = LocalCluster()
client = Client(cluster)

ddf = timeseries()
res = ddf.reset_index().groupby('name')['x'].mean()
res.compute()
# see "progress" tab on bokeh-powered dashboard

Thanks for the helpful answer. Does this mean that, if we’re using the distributed scheduler, the only diagnostic we can use is the web UI, and we can’t get anything printed to stdout/logging?

Nope, not at all! There are a number of options for logging, the Dask distributed logging page has an overview and for more specific guidance on controlling logged output I’d recommend the logging section on the debugging page.

1 Like

Thanks for the pointers. Following that guide I was able to determine that I can get the task transitions if I do the following:

logging.getLogger("distributed.scheduler").setLevel(logging.DEBUG)

This gives me logs such as:

distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 2), tcp://127.0.0.1:42181
distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 0), tcp://127.0.0.1:42201
distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 1), tcp://127.0.0.1:45229
distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 3), tcp://127.0.0.1:41599
distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 5), tcp://127.0.0.1:42201
distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 6), tcp://127.0.0.1:45229
distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 4), tcp://127.0.0.1:42181
distributed.scheduler - DEBUG - Stimulus task finished ('from_pandas-2b3f78b4bd17e79620af6e53e647318e', 7), tcp://127.0.0.1:41599
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 2), tcp://127.0.0.1:42181
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 1), tcp://127.0.0.1:45229
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 3), tcp://127.0.0.1:41599
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 0), tcp://127.0.0.1:42201
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 7), tcp://127.0.0.1:45229
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 4), tcp://127.0.0.1:42181
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 6), tcp://127.0.0.1:41599
distributed.scheduler - DEBUG - Stimulus task finished ('explode-18c5ea4301bfdc82a4139c4302919b1a', 5), tcp://127.0.0.1:42201

It’s great to be able to see in this level of detail. This is generally what I want - the individual subtasks (I guess they’re called stimulus tasks?). But it would be great to get these without all the other logs I get inundated with due to using such a low debug level. Ideally it would even print out all of the state transitions, such as the task submission not just when a task finishes.

I also wonder, if the logs have this level of detail, why we can’t create a progress bar context manager just like there is for the local scheduler.

1 Like

Glad to help! To further customize logging, I think you’d want to use structured logs. Additionally, the scheduler, workers, and client all emit logs using Python’s standard logging module, which allows further customization (for more on this, see this discourse topic).

Also, you’re welcome to submit a feature request to dask/distributed regarding the progress bar.

@multimeric there is a feature of dask.distributed.progress that I should have pointed out-- you can use it to track the progress of a Dask object that is executing in the background (as noted here in the Dask docs). Here’s a short example:

import dask.dataframe as dd
from dask.datasets import timeseries
from dask.distributed import Client, LocalCluster, progress

cluster = LocalCluster()
client = Client(cluster)

ddf = timeseries()
res = ddf.reset_index().groupby('name')['x'].mean().persist()
progress(res)

dask.distributed.progress

2 Likes

Perfect, this does exactly what I want! Thank you for pointing this out!

1 Like