I’m currently investigating performances of a simple dask.array workflow that just computes the standard deviation of an array (using the distributed scheduler). For understanding purpose, I just used a single node and I tested 2 different configurations:
1 worker with 12 thread (thread-based parallelism)
12 workers with 1 thread (multi-processing parallelism)
Even if the result is correct in the 2 cases, it seems that memory is released quite quickly when using thread-based parallelism contrary to process-based parallelism where the amount of memory tagged as “in-memory” in the dashboard continuously grows until about 80% of the computation and then fall quickly.
Moreover, for the multi-process case, the scheduling looks quite inefficient (large white area between tasks, …).
top) process based parallelism, bottom) thread based parallelism
Thanks for the question @smartin! It’s possible what you’re seeing is related to this behavior, and you could investigate by manually trimming memory. Would you also be able to share a minimally reproducible example of the calculation you mentioned? Then we could dig into the scheduling differences you showed in the screenshot as well.
Thanks very much for your response and suggestions,
The memory is correctly released at the end of the computation so trimming memory should not help me. My main concern is that I cannot figure out why the memory footprint (during the computation) and also the scheduling performances are very different in the 2 scenarios (thread-based vs process-based).
As you can see in the screenshot there are 3232 "in-memory items” in the first case (12 processes at about 80% of the computation) while there are never more than 100 "in-memory items” in the second case (thread based). Objects seem to be continuously released in the thread-based approach while they are mainly released at the end of the computation in the process-based approach.
I suspect that the graph task is processed more broadly than deeply. Note that I investigate these scenarios because I plan to use Dask to schedule image processing tasks that cannot be processed in different threads.
Here is a sample code that allows you to reproduce the issue
from dask import array
data = array.random.random((82944, 126976, 3), chunks=(1024,1024,3))
std = data.std()
std.compute()
to start workers in a thread-based way dask-worker.exe tcp://192.168.0.104:8786 --nprocs=1 --nthreads=12
to start workers in a process-based way dask-worker.exe tcp://192.168.0.104:8786 --nprocs=12 --nthreads=1
If you scale down your example, you can look at the task graph and see the scheduler taking a different approach for the two scenarios. Reading this page on scheduling decisions sheds some light on what the task graph is showing, and it seems your suspicions are correct that depth-first scheduling is possible with the threaded-based approach. This depth-first scheduling allows intermediate results to be cleared more quickly.