I noticed that when I run my code using the processes
scheduler, it runs the code sequentially.
Attached is the code below:
Input Code
import tracemalloc
import time
from time import perf_counter
import dask
import dask.dataframe as dd
import pandas as pd
from tqdm.auto import tqdm
from functools import wraps
def measure_performance(func):
"""Measure performance of a function"""
@wraps(func)
def wrapper(*args, **kwargs):
tracemalloc.start()
start_time = perf_counter()
result = func(*args, **kwargs)
current, peak = tracemalloc.get_traced_memory()
finish_time = perf_counter()
print(f'Function: {func.__name__}')
print(f'Method: {func.__doc__}')
print(f'Memory usage:\t\t {current / 10 ** 6:.6f} MB \n'
f'Peak memory usage:\t {peak / 10 ** 6:.6f} MB ')
print(f'Time elapsed is seconds: {finish_time - start_time:.6f}')
print(f'{"-" * 40}')
tracemalloc.stop()
return result
return wrapper
@measure_performance
def dask_fn(df: pd.DataFrame) -> pd.DataFrame:
for _, row in tqdm(df.iterrows(), total=len(df)):
time.sleep(2)
df.loc[_, 'C'] = row['A'] + row['B']
df.loc[_, 'D'] = df.loc[_, 'C'] ** 2
return df
@measure_performance
def main(npartitions: int = 2):
print(dask.config.get('scheduler', 'NA'))
# Sample pandas DataFrame
df = pd.DataFrame({
'A': range(10),
'B': range(10, 20)
})
ddf = dd.from_pandas(df, npartitions=npartitions)
# Apply the custom function to each partition using map_partitions
result_ddf = ddf.map_partitions(dask_fn, meta=pd.DataFrame(columns=['A', 'B', 'C', 'D']))
# Trigger computation and display the result
print("Processing...")
result = result_ddf.compute()
print("Completed!")
print(result)
if __name__ == "__main__":
with dask.config.set(scheduler='processes'):
main(2)
Output using the processes
scheduler.
python dummy_dask.py
processes
Processing...
100%|ββββββββββ| 5/5 [00:10<00:00, 2.01s/it]
Function: dask_fn
Method: None
Memory usage: 0.195495 MB
Peak memory usage: 0.202708 MB
Time elapsed is seconds: 10.046859
----------------------------------------
100%|ββββββββββ| 5/5 [00:10<00:00, 2.01s/it]
Function: dask_fn
Method: None
Memory usage: 0.007188 MB
Peak memory usage: 0.014931 MB
Time elapsed is seconds: 10.034718
----------------------------------------
Completed!
A B C D
0 0 10 10.0 100.0
1 1 11 12.0 144.0
2 2 12 14.0 196.0
3 3 13 16.0 256.0
4 4 14 18.0 324.0
5 5 15 20.0 400.0
6 6 16 22.0 484.0
7 7 17 24.0 576.0
8 8 18 26.0 676.0
9 9 19 28.0 784.0
Function: main
Method: None
Memory usage: 11.423940 MB
Peak memory usage: 11.451597 MB
Time elapsed is seconds: 21.074527
----------------------------------------
Process finished with exit code 0
Output using the threads
scheduler.
python dummy_dask.py
threads
Processing...
0%| | 0/5 [00:00<?, ?it/s]
0%| | 0/5 [00:00<?, ?it/s]dummy_dask.py:36: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
df.loc[_, 'C'] = row['A'] + row['B']
dummy_dask.py:37: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
df.loc[_, 'D'] = df.loc[_, 'C'] ** 2
20%|ββ | 1/5 [00:02<00:08, 2.02s/it]dummy_dask.py:36: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
df.loc[_, 'C'] = row['A'] + row['B']
dummy_dask.py:37: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
df.loc[_, 'D'] = df.loc[_, 'C'] ** 2
40%|ββββ | 2/5 [00:04<00:06, 2.01s/it]
60%|ββββββ | 3/5 [00:06<00:04, 2.01s/it]
80%|ββββββββ | 4/5 [00:08<00:02, 2.01s/it]
100%|ββββββββββ| 5/5 [00:10<00:00, 2.01s/it]
Function: dask_fn
Method: None
Memory usage: 11.412000 MB
Peak memory usage: 11.442180 MB
Time elapsed is seconds: 10.065535
----------------------------------------
100%|ββββββββββ| 5/5 [00:10<00:00, 2.02s/it]
Function: dask_fn
Method: None
Memory usage: 0.000000 MB
Peak memory usage: 0.000000 MB
Time elapsed is seconds: 10.088538
----------------------------------------
Completed!
A B C D
0 0 10 10.0 100.0
1 1 11 12.0 144.0
2 2 12 14.0 196.0
3 3 13 16.0 256.0
4 4 14 18.0 324.0
5 5 15 20.0 400.0
6 6 16 22.0 484.0
7 7 17 24.0 576.0
8 8 18 26.0 676.0
9 9 19 28.0 784.0
Function: main
Method: None
Memory usage: 0.000000 MB
Peak memory usage: 0.000000 MB
Time elapsed is seconds: 10.362858
----------------------------------------
Process finished with exit code 0
Package versions
dask 2024.7.0
dask-expr 1.1.7
dask-kubernetes 2024.4.0
distributed 2024.7.0