Using ProgressBar() with distributed Client

Hi everyone,
I want to use the dask.diagnostics.ProgressBar together with a dask.distributed.Client.
Please consider the following code example, where a delayed function is called with an int as argument and returns an array:

import numpy as np
import dask
import dask.array as da
from dask.distributed import Client
from dask.diagnostics import ProgressBar
import time

client = Client(processes=True, threads_per_worker=1, n_workers=2, memory_limit='2GB')

@dask.delayed
def some_func(input):
    time.sleep(2)
    return np.arange(input)

pbar = ProgressBar()
pbar.register()
  
arr = da.arange(5)
results = []
for value in arr:
    batch_res = some_func(value)
    results.append(batch_res)

# If client is used:
final_res = client.gather(client.compute(results))
# If no client is used:
final_res = dask.compute(results)
  
pbar.unregister()

When client = Client(…) is used, the ProgressBar() does not work.
If the line is commented out and dask.compute(results) is called, the ProgressBar works.

I would like to get a ProgressBar for my client as well. Is there any way to achieve this?

Best and thanks in advance.

Thanks for your question, @jannk !

I wasn’t able to reproduce this by executing your code as is – I didn’t see the ProgressBar on running final_res = dask.compute(results). But, I could see it if I closed the client: client.close() before this statement.

The ProgressBar() available in dask.dianostics is mainly for local computations, i.e., when you use the single-machine schedulers. (That’s why you can see it after you close the client, Dask goes back to a local scheduler.)

When you use Client, you’re automatically using the distributed scheduler, which comes with an extensive diagnostic dashboard (available at client.dashboard_link) . In this dashboard, the progress bar is under the ‘Status’ tab. (You can also access this dashboard through the Dask JupyterLab Extension if you use JupyterLab.)

If you want to view only the progress, you can also use the progress() function available in dask.distributed:

from dask.distributed import progress

final_res = client.gather(client.persist(results))
progress(final_res)

Also, here’s a blog that covers all of this in greater detail. Does this help answer your question?

2 Likes

Hi @pavithraes,
thank you very much for your detailed answer!
It helped me alot and it also corrected my assumptions that local = on this machine and distributed = on other machines, e.g. via SSH. Since I only start everything on my computer or the programm only on the cluster without any scheduler (SLURM etc), my assumption that this is nevertheless ‘local’ was of course wrong.

1 Like