Some operations in Dask's computation graph are not reflected in the profiling tools (e.g., "sin", "log", "slice")

Hello,

Profiling tools like Dask’s “Performance Report”, “Profiler”, and fine performance metrics fail to display certain operations in the task section. This makes it challenging to identify bottlenecks in my workflow.

I try below to mimic my real use case scenario which roughly looks like:

create dask graphs for different variables => using persist on the largest computations =>
continue to add operations to the variables => write to disk (or call compute)

  1. Step to reproduce: first steps
from dask.distributed import performance_report, LocalCluster, Client
import dask.array as da
from dask.diagnostics import Profiler

cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=2, 
        processes=False,
        memory_limit="20gb",  # can be changed
    )

client = Client(cluster)
print(client.dashboard_link)

array = da.random.random((10, 10), chunks=(5, 5))

# simulate a deep graph
for _ in range(20):
    array = da.abs(da.sin(array+100))

# visualize: the graph is fine
array.visualize("intermediate.png")

# Another array is created
array2 = array[::2]**3

# Intermediate persist
array = array.persist()

Expected Behavior: I expected to see information about the time required to compute da.abs, da.sin and + operation.

Obtained behavior: fine performance metrics don’t show any sign of da.sin or + operator. Similarly, these operations are absent from the “task” section.

  1. Step to reproduce: continue to process data
# Other computations
for _ in range(20):
    array = da.mod(da.log(array/20), 10)

# visualize: the graph is fine
array.visualize("final.png")

array = array.compute()

Expected behavior: I expect to see time information abot da.log and da.mod and / operator.
Fine performance metrics again only show the mod (“remainder”) operation, but don’t show any timing for da.log and / operator.

  1. Final compute

array2 = array2.compute()

Expected behavior: I expected to see pow and slice in the metrics

Obtained behavior: the “pow” operation is here, but the “slice” operation is absent.

What I tried:

  1. Dask profiler
with Profiler() as prof:
    array = array.compute()
prof.visualize()

yields an empty result.

  1. Dask performance report
    The profile sections of the performance report are empty with this code used above.
Code to reproduce the issue with the performance report
from dask.distributed import performance_report, LocalCluster, Client
import dask.array as da

cluster = LocalCluster(
        n_workers=1,  # Number of workers (processes)
        threads_per_worker=2,  # Threads per worker
        processes=False,
        memory_limit="20gb",
    )  # Enable processes (default is True)

# Link the cluster to a client
client = Client(cluster)
print(client.dashboard_link)
with performance_report(filename="forum_discussion.html"):


    array = da.random.random((10, 10), chunks=(5, 5))

    # simulate a deep graph
    for _ in range(20):
        array = da.abs(da.sin(array+100))

    # visualize: the graph is fine
    array.visualize("intermediate.png")

    # Another array is created
    array2 = array[::2]**3

    # Intermediate persist
    array = array.persist()

    # Other computations
    for _ in range(20):
        array = da.mod(da.log(array/20), 10)

    # visualize: the graph is fine
    array.visualize("final.png")

    array = array.compute()
    array2 = array2.compute()

What could be tried:

  1. Classic profiler

Not feasible due to multi-threading/multi-processing?

Questions

  1. Are these tasks absent because of task fusion/optimization?
  2. Are there alternative tools for profiling Dask computations?
  3. Is it the right way to profile code with intermediate .persist / .compute?
  4. Is this a known limitation or a potential bug?

In my real workflow (processing satellite data), the NA tasks shown in the Fine Performance Metrics grows very large and the profiler sections are not empty but extremely reduced (way smaller than what is actually computed). This make it hard to identify slow sections in the code and optimize it.

Thanks for any potential help (and for this nice forum)!

Hi @louislt,

First of all, I’m not an expert in all this, so cc @crusaderky.

This profiler cannot be applied for Distributed cluster, only local ones.

You should try with something bigger, I don’t see anything into the Dashboard tab neither.

You could try in a single-threaded Dask context if you have tests small enough.

I believe so: metrics are only gather by task really executed on Workers, and in the tasks stream view, I can only see the tasks that are in the profiles.

Yes, at tasks level.

Hi Guillaume,

thanks for your precious help!

This profiler cannot be applied for Distributed cluster, only local ones.

I keep in mind that the profiler is suited for local clusters. It should threorically be compatible with the example above right (the cluster is local and multi-threaded)?

You should try with something bigger, I don’t see anything into the Dashboard tab neither.

You could try in a single-threaded Dask context if you have tests small enough.

I investigated with various matrix sizes and graph depths and switched back to a single-thread scheduler.

New configuration:

Sizes (larger): da.random.random((10_000, 10_000), chunks=(1_000, 1_000))
Graph depth (shallower): for _ in range(10):
Time to execute the example: ~1min on my laptop.

Fine Performance metrics and performance report:

Issue: I expected to find information about da.mod, da.sin but can not find them (not in the performance report Tasks Stream and profile sections, neither in the Fine Performance Metrics section of the dashboard).

Code
from dask.distributed import LocalCluster, Client, performance_report
import dask.array as da

cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=1, 
        processes=False,
        memory_limit="20gb",  # can be changed
    )

client = Client(cluster)
print(client.dashboard_link)

with performance_report(filename="forum_discussion_larger.html"):
    array = da.random.random((10_000, 10_000), chunks=(1_000, 1_000))

    # simulate a deep graph
    for _ in range(10):
        array = da.abs(da.sin(array+100))

    # Another array is created
    array2 = array[::2]**3

    # Intermediate persist
    array = array.persist()

    # Other computations
    for _ in range(10):
        array = da.mod(da.log(array/20), 10)

    array = array.compute()

    array2 = array2.compute()

Result:

Cprofile and snakeviz

Issue: the issue is similar. I can’t find information about the time spent in each function (everything is packed into “method ‘acquire’ of ‘thread_lock’ objects”). There no sign of da.sin for example.

Code:
import cProfile

from dask.distributed import LocalCluster, Client
import dask.array as da

cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=1, 
        processes=False,
        memory_limit="20gb",  # can be changed
    )

client = Client(cluster)
print(client.dashboard_link)

with cProfile.Profile() as pr:
    array = da.random.random((10_000, 10_000), chunks=(1_000, 1_000))

    # simulate a deep graph
    for _ in range(10):
        array = da.abs(da.sin(array+100))

    # Another array is created
    array2 = array[::2]**3

    # Intermediate persist
    array = array.persist()

    # Other computations
    for _ in range(10):
        array = da.mod(da.log(array/20), 10)

    # with cProfile.Profile() as pr:
    array = array.compute()

    array2 = array2.compute()

pr.dump_stats("discussion_dask_forum.prof")

then execute snakeviz discussion_dask_forum.prof in the terminal.

Result:

When Profiling only one compute call:

with cProfile.Profile() as pr:
    array = array.compute()

we obtain similar results.

map_blocks

Interestingly, with map_blocks, the function f appears in the dashboard (but as expected not it’s details).

def f(array):
    return np.mod(np.log(array/20), 10)

Code
from dask.distributed import LocalCluster, Client, performance_report
import dask.array as da

cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=1, 
        processes=False,
        memory_limit="20gb",  # can be changed
    )

client = Client(cluster)
print(client.dashboard_link)

def f(array):
    return np.mod(np.log(array/20), 10)

with performance_report(filename="forum_discussion_larger_map_blocks.html"):
    array = da.random.random((10_000, 10_000), chunks=(1_000, 1_000))

    # simulate a deep graph
    for _ in range(10):
        array = da.abs(da.sin(array+100))

    # Another array is created
    array2 = array[::2]**3

    # Intermediate persist
    array = array.persist()

    # Other computations
    for _ in range(10):
        array = da.map_blocks(f, array, dtype=array.dtype)

    array = array.compute()

    array2 = array2.compute()

By local, I mean single machine scheduler, as described in this page. A LocalCluster is actually a distributed cluster implementation.

This is due to the above explanation. If you want to use this kind of profiling tool, only a single-threaded Scheduler will work. Otherwise, you are profiling the Client process, which just wait for results.

In the end, I think it all comes down to the fact that only a single task is executed for all your function calls, and the profiling from Dask distributed is at task level.

Hi @louislt ,

You’re not seeing the individual operations because the dask optimizer fuses multiple tasks into one when it is beneficial for performance.

If you run array.compute(optimize_graph=False) you will see the individual operations.

Particularly for arrays with small chunks, this may heavily skew your overall performance profile. In the fine performance metrics, you will see “thread-cpu” and “thread-noncpu” remain the same with and without the optimizer on, whereas everything else (dask overhead) will baloon.

In order to minimize the impact of the optimizer, you should run your test on larger chunks. The default size when you don’t specify the chunk= parameter (~100 MB) is a good starting point.