Why is 'processes' executing my code sequentially?

I noticed that when I run my code using the processes scheduler, it runs the code sequentially.

Attached is the code below:

Input Code

import tracemalloc
import time
from time import perf_counter
import dask
import dask.dataframe as dd
import pandas as pd
from tqdm.auto import tqdm
from functools import wraps


def measure_performance(func):
    """Measure performance of a function"""

    @wraps(func)
    def wrapper(*args, **kwargs):
        tracemalloc.start()
        start_time = perf_counter()
        result = func(*args, **kwargs)
        current, peak = tracemalloc.get_traced_memory()
        finish_time = perf_counter()
        print(f'Function: {func.__name__}')
        print(f'Method: {func.__doc__}')
        print(f'Memory usage:\t\t {current / 10 ** 6:.6f} MB \n'
              f'Peak memory usage:\t {peak / 10 ** 6:.6f} MB ')
        print(f'Time elapsed is seconds: {finish_time - start_time:.6f}')
        print(f'{"-" * 40}')
        tracemalloc.stop()
        return result

    return wrapper

@measure_performance
def dask_fn(df: pd.DataFrame) -> pd.DataFrame:
    for _, row in tqdm(df.iterrows(), total=len(df)):
        time.sleep(2)
        df.loc[_, 'C'] = row['A'] + row['B']
        df.loc[_, 'D'] = df.loc[_, 'C'] ** 2
    return df

@measure_performance
def main(npartitions: int = 2):
    print(dask.config.get('scheduler', 'NA'))
    # Sample pandas DataFrame
    df = pd.DataFrame({
        'A': range(10),
        'B': range(10, 20)
    })
    ddf = dd.from_pandas(df, npartitions=npartitions)
    # Apply the custom function to each partition using map_partitions
    result_ddf = ddf.map_partitions(dask_fn, meta=pd.DataFrame(columns=['A', 'B', 'C', 'D']))

    # Trigger computation and display the result
    print("Processing...")
    result = result_ddf.compute()
    print("Completed!")
    print(result)

if __name__ == "__main__":
    with dask.config.set(scheduler='processes'):
        main(2)

Output using the processes scheduler.

python dummy_dask.py 
processes
Processing...
100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:10<00:00,  2.01s/it]
Function: dask_fn
Method: None
Memory usage:		 0.195495 MB 
Peak memory usage:	 0.202708 MB 
Time elapsed is seconds: 10.046859
----------------------------------------
100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:10<00:00,  2.01s/it]
Function: dask_fn
Method: None
Memory usage:		 0.007188 MB 
Peak memory usage:	 0.014931 MB 
Time elapsed is seconds: 10.034718
----------------------------------------
Completed!
   A   B     C      D
0  0  10  10.0  100.0
1  1  11  12.0  144.0
2  2  12  14.0  196.0
3  3  13  16.0  256.0
4  4  14  18.0  324.0
5  5  15  20.0  400.0
6  6  16  22.0  484.0
7  7  17  24.0  576.0
8  8  18  26.0  676.0
9  9  19  28.0  784.0
Function: main
Method: None
Memory usage:		 11.423940 MB 
Peak memory usage:	 11.451597 MB 
Time elapsed is seconds: 21.074527
----------------------------------------

Process finished with exit code 0

Output using the threads scheduler.

python dummy_dask.py 
threads
Processing...
  0%|          | 0/5 [00:00<?, ?it/s]
  0%|          | 0/5 [00:00<?, ?it/s]dummy_dask.py:36: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[_, 'C'] = row['A'] + row['B']
dummy_dask.py:37: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[_, 'D'] = df.loc[_, 'C'] ** 2
 20%|β–ˆβ–ˆ        | 1/5 [00:02<00:08,  2.02s/it]dummy_dask.py:36: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[_, 'C'] = row['A'] + row['B']
dummy_dask.py:37: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[_, 'D'] = df.loc[_, 'C'] ** 2

 40%|β–ˆβ–ˆβ–ˆβ–ˆ      | 2/5 [00:04<00:06,  2.01s/it]
 60%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ    | 3/5 [00:06<00:04,  2.01s/it]
 80%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ  | 4/5 [00:08<00:02,  2.01s/it]
100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:10<00:00,  2.01s/it]
Function: dask_fn
Method: None
Memory usage:		 11.412000 MB 
Peak memory usage:	 11.442180 MB 
Time elapsed is seconds: 10.065535
----------------------------------------

100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 5/5 [00:10<00:00,  2.02s/it]
Function: dask_fn
Method: None
Memory usage:		 0.000000 MB 
Peak memory usage:	 0.000000 MB 
Time elapsed is seconds: 10.088538
----------------------------------------
Completed!
   A   B     C      D
0  0  10  10.0  100.0
1  1  11  12.0  144.0
2  2  12  14.0  196.0
3  3  13  16.0  256.0
4  4  14  18.0  324.0
5  5  15  20.0  400.0
6  6  16  22.0  484.0
7  7  17  24.0  576.0
8  8  18  26.0  676.0
9  9  19  28.0  784.0
Function: main
Method: None
Memory usage:		 0.000000 MB 
Peak memory usage:	 0.000000 MB 
Time elapsed is seconds: 10.362858
----------------------------------------

Process finished with exit code 0

Package versions

dask                               2024.7.0
dask-expr                          1.1.7
dask-kubernetes                    2024.4.0
distributed                        2024.7.0

Why does it seem like the processes scheduler is executing code sequentially?

I dont see any Cluster setup though, you can run multi-process or multi-threaded logic without setting up a distributed cluster.

  • Without a distributed setup, Dask will still run on your local machine, either using threads (for multi-threading) or processes (for multi-processing), depending on the scheduler configuration.
  • Since you set the scheduler='processes', Dask is definitely running in parallel using multiple processes, not purely sequentially. However, certain operations may make it appear sequential.

Im not so confident its pure sequential:

The primary reason for the apparent sequential behavior is due to the use of iterrows(), which processes data row by row, making the operation inherently sequential within each partition.

Additionally, the time.sleep(2) call makes this behavior even more pronounced. Since each row is processed sequentially, adding a 2-second delay per row gives the illusion that the entire operation is running sequentially, even though Dask is distributing partitions to different processes or threads.

You are using 2 partitions with 10 rows in total. This means each partition has 5 rows. While Dask is distributing the work between the two partitions:

  • In the processes scheduler, these partitions are being processed in parallel by different processes, but since both partitions take 10 seconds (2 seconds per row for 5 rows), the total time is still around 10 seconds.
  • Similarly, in the threads scheduler, Dask processes the partitions in parallel using threads, but again, each partition takes 10 seconds due to the sequential nature of iterrows() and the time.sleep() delay.

In general, using iterrows() is considered bad practice in Pandas and Dask due to its inefficiency and sequential row-wise processing. It’s better to avoid it and removing the time.sleep would also help performance

When working with Dask, keep in mind:

  1. Do you really need Dask: If your dataset fits into memory, it’s often more efficient to use Pandas directly and if you want Dask - use map_partitions, map_blocks etc…
  2. Prefer Dask’s built-in functionality: use built-in methods and vectorized operations wherever possible to fully take advantage of parallelism. Only fall back to Pandas when necessary, and always avoid row-wise operations like iterrows() in favor of columnar, vectorized operations.
1 Like