Using my AI assistant, I created the following code snippet.
import dask
import dask.dataframe as dd
import pandas as pd
import numpy as np
import time
from dask.diagnostics import ProgressBar
# Global progress bar setup
global_progress_bar = ProgressBar()
def generate_large_dataset(num_rows=10_000_000):
np.random.seed(42)
def generate_memory_efficient_column(size):
base = np.random.normal(0, 1, size)
return np.round(base, 4)
data = {
'id': np.arange(num_rows),
'value1': generate_memory_efficient_column(num_rows),
'value2': generate_memory_efficient_column(num_rows),
'category': np.random.choice(['A', 'B', 'C', 'D'], num_rows)
}
return dd.from_pandas(pd.DataFrame(data), npartitions=10)
def data_transformation_module(df):
def memory_efficient_transform(partition):
result = partition.copy()
result['processed_value'] = (
np.abs(result['value1']) ** 1.5 +
np.log1p(np.abs(result['value2']))
)
result['processed_value'] = np.round(
result['processed_value'] / result['processed_value'].max(),
4
)
return result
# This computation will now use the global progress bar
cleaned_df = df.map_partitions(memory_efficient_transform)
return cleaned_df.compute()
def aggregation_module(cleaned_df):
def efficient_aggregation(group):
return pd.Series({
'mean_processed': group['processed_value'].mean(),
'std_processed': group['processed_value'].std(),
'count': len(group)
})
# This computation will also use the global progress bar
grouped_results = (
dd.from_pandas(cleaned_df, npartitions=5)
.groupby('category')
.apply(efficient_aggregation,
meta={
'mean_processed': 'float64',
'std_processed': 'float64',
'count': 'int64'
})
.compute()
)
return grouped_results
def filtering_module(cleaned_df):
def memory_efficient_filter(df):
filtered = df[
(df['processed_value'] > df['processed_value'].quantile(0.75)) &
(df['category'].isin(['A', 'B']))
]
return filtered.nlargest(500, 'processed_value')
# This computation will also use the global progress bar
return memory_efficient_filter(cleaned_df)
def complex_computation(df):
# Register the global progress bar once
with global_progress_bar:
# Deep processing modules can now work without directly managing progress bars
cleaned_df = data_transformation_module(df)
grouped_results = aggregation_module(cleaned_df)
final_results = filtering_module(cleaned_df)
return grouped_results, final_results
def main():
large_df = generate_large_dataset()
start_time = time.time()
grouped_results, top_results = complex_computation(large_df)
end_time = time.time()
print(f"Total processing time: {end_time - start_time:.2f} seconds")
print("\nGrouped Results:")
print(grouped_results)
print("\nTop Processed Entries:")
print(top_results)
if __name__ == '__main__':
main()
The output looks like this:
[########################################] | 100% Completed | 407.36 ms
[########################################] | 100% Completed | 1.63 sms
Total processing time: 4.01 seconds
(snip)
A crucial element of this code is that the ProgressBar is declared at the top level and this is the used throughout the underlying execution, even though that has multiple parts. We want this separation, because otherwise we have to blend UI elements (ProgressBars) with computational elements (the dataframe operations) which makes the code more difficult for us to manage.
However, I’d still like each ProgressBar to provide detail on what’s going on. Something like this:
Transforming: [########################################] | 100% Completed | 407.36 ms
Aggregating: [########################################] | 100% Completed | 1.63 sms
Total processing time: 4.01 seconds
So I guess if I have two questions:
- Can the ProgressBar be extended so that it can consume and render a string label?
- Can this label be inferred by the operation? This could either be by trying to guess a label from the task graph that is in use, or by explicitly giving a name to an operation as it is executed, that can then be used to render the ProgressBar?
As a Dask newbie, (1) sounds easier than (2)
We tried to look at this by cloning the ProgressBar code to see if we could pull out a suitable string from the task graph objects (for example from self._state
here) but without success.