Naming a ProgressBar

We are using Dask as part of a script that wraps multiple different Dask-coordinated operations. We’ve started using the ProgressBar feature to provide feedback as the script progresses. We do this by registering a progress bar as the script starts, which produces a succession of progress bars as the underlying operations execute.

We have logging to provide additional information about what is happening, but it would be cleanest to be able to label the progress bar itself, perhaps even by looking up some representative string from the task graph. Is this possible?

There are some relevant suggestions to use tqdm here, which would give the ability to label a progress bar, but it would still be helpful to be able to dynamically allocate this label based on the current operation.

Hi @Peter_Saffrey, welcome to Dask community!

I don’t see this feature by looking at the documentation, but perhaps this could be modified.

With a quick search, I don’t see any tqdm mention here…

In order to better help, could you provide some MVCE demonstrating what you are trying to achieve?

Also, keep in mind that ProgressBar won’t work if you began to use Distributed clusters.

Oh, my apologies, I pasted the same link twice. For tqdm I meant this link.

I’ll try to come up with a minimal example to show what I mean.

Using my AI assistant, I created the following code snippet.

import dask
import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
from dask.diagnostics import ProgressBar

# Global progress bar setup
global_progress_bar = ProgressBar()

def generate_large_dataset(num_rows=10_000_000):
    np.random.seed(42)
    
    def generate_memory_efficient_column(size):
        base = np.random.normal(0, 1, size)
        return np.round(base, 4)
    
    data = {
        'id': np.arange(num_rows),
        'value1': generate_memory_efficient_column(num_rows),
        'value2': generate_memory_efficient_column(num_rows),
        'category': np.random.choice(['A', 'B', 'C', 'D'], num_rows)
    }
    
    return dd.from_pandas(pd.DataFrame(data), npartitions=10)

def data_transformation_module(df):
    def memory_efficient_transform(partition):
        result = partition.copy()
        result['processed_value'] = (
            np.abs(result['value1']) ** 1.5 + 
            np.log1p(np.abs(result['value2']))
        )
        result['processed_value'] = np.round(
            result['processed_value'] / result['processed_value'].max(), 
            4
        )
        return result
    
    # This computation will now use the global progress bar
    cleaned_df = df.map_partitions(memory_efficient_transform)
    return cleaned_df.compute()

def aggregation_module(cleaned_df):
    def efficient_aggregation(group):
        return pd.Series({
            'mean_processed': group['processed_value'].mean(),
            'std_processed': group['processed_value'].std(),
            'count': len(group)
        })
    
    # This computation will also use the global progress bar
    grouped_results = (
        dd.from_pandas(cleaned_df, npartitions=5)
        .groupby('category')
        .apply(efficient_aggregation, 
               meta={
                   'mean_processed': 'float64',
                   'std_processed': 'float64',
                   'count': 'int64'
               })
        .compute()
    )
    return grouped_results

def filtering_module(cleaned_df):
    def memory_efficient_filter(df):
        filtered = df[
            (df['processed_value'] > df['processed_value'].quantile(0.75)) & 
            (df['category'].isin(['A', 'B']))
        ]
        return filtered.nlargest(500, 'processed_value')
    
    # This computation will also use the global progress bar
    return memory_efficient_filter(cleaned_df)

def complex_computation(df):
    # Register the global progress bar once
    with global_progress_bar:
        # Deep processing modules can now work without directly managing progress bars
        cleaned_df = data_transformation_module(df)
        grouped_results = aggregation_module(cleaned_df)
        final_results = filtering_module(cleaned_df)
    
    return grouped_results, final_results

def main():
    large_df = generate_large_dataset()
    
    start_time = time.time()
    grouped_results, top_results = complex_computation(large_df)
    end_time = time.time()
    
    print(f"Total processing time: {end_time - start_time:.2f} seconds")
    print("\nGrouped Results:")
    print(grouped_results)
    print("\nTop Processed Entries:")
    print(top_results)

if __name__ == '__main__':
    main()

The output looks like this:

[########################################] | 100% Completed | 407.36 ms
[########################################] | 100% Completed | 1.63 sms
Total processing time: 4.01 seconds
(snip)

A crucial element of this code is that the ProgressBar is declared at the top level and this is the used throughout the underlying execution, even though that has multiple parts. We want this separation, because otherwise we have to blend UI elements (ProgressBars) with computational elements (the dataframe operations) which makes the code more difficult for us to manage.

However, I’d still like each ProgressBar to provide detail on what’s going on. Something like this:

Transforming: [########################################] | 100% Completed | 407.36 ms
Aggregating: [########################################] | 100% Completed | 1.63 sms
Total processing time: 4.01 seconds

So I guess if I have two questions:

  1. Can the ProgressBar be extended so that it can consume and render a string label?
  2. Can this label be inferred by the operation? This could either be by trying to guess a label from the task graph that is in use, or by explicitly giving a name to an operation as it is executed, that can then be used to render the ProgressBar?

As a Dask newbie, (1) sounds easier than (2) :slight_smile: We tried to look at this by cloning the ProgressBar code to see if we could pull out a suitable string from the task graph objects (for example from self._state here) but without success.

Thanks for the details in here!

Well, clearly, it would be easy to add an optional label to the ProgressBar. But it looks like it doesn’t fit your use case, if the ProgressBar is declared at the top level.

The other part is possible, but I don’t think you can really have any control on what is on the Dask graph or on going tasks. From what I see it’s just set of tasks.

I tried to do something like:

 def _update_bar(self, elapsed):
        s = self._state
        if not s:
            self._draw_bar(0, elapsed)
            return
        ndone = len(s["finished"])
        ntasks = sum(len(s[k]) for k in ["ready", "waiting", "running"]) + ndone
        if s["running"]:
            self._label = next(iter(s["running"]))
        elif s["finished"]:
            self._label =  next(iter(s["finished"]))

and using this label when drawing the Bar, but tasks name are not really meaningful.

But looking at this example, shouldn’t you just use some print/logging statements before the two functions?

Hi @guillaumeeb ,

Thanks a lot for continuing to help me with this. Yes, we are trying to use logging lines before each potentially long-running operation, which can act as a label for the progress bars. It’s just that for more complex operations, the data might go through a number of stages and each one will get its own progress bar. This becomes more complex when the number of progress bars could vary with the size of the input data.

Perhaps my option (2) here is too hard to solve in the first instance, but I think adding support for tqdm style labels for progress bars would still be pretty useful and move us in the right direction. Would you be open to me making a PR for that?

Thanks,

Peter

1 Like

I’m not one of Dask maintainer, but I certainly see the point, and since the changes should not be heavy, I greatly encourage you to do so!

1 Like