Measuring the overall profile of long runs

I am using Dask distributed to run large numbers (thousands) of dynamically generated tasks. There are different types of tasks which each correspond to a function in my code. The total runtime of the calculations is in the order of hours to days and uses multiple nodes using Dask-jobqueue.

What I want to do is get a breakdown of the total time spent in each function. The profile page on the dashboard does exactly what I want, but it seems to be limited to only one hour of data, which is way too small. The HTML files created with client.profile don’t contain more data. Is there a simple way to record a global profile?

1 Like

Hi @RaphaelRobidas, welcome back!

Did you try all the possibilities described here:
https://docs.dask.org/en/stable/diagnostics-distributed.html#capture-diagnostics

?

If even the performance report doesn’t contain more than one hour of data, then this is a problem. Did you also tried to use the start kwarg of Client.profile?

Hello @guillaumeeb,

Thanks for the reply.

I am also saving the task streams, but they are not so convenient for what I’m trying to measure. They don’t seem to contain all the tasks since the start of the job. I am writing the diagnostics every “cycle” in my code, so it could be possible that task streams get flushed every time and contain all the tasks when combined. In any case, I would need to add up all the task times to get the total runtime, and it is not quite as detailed as the profile.

I added the start argument to Client.profile. The exact type is not specified, the documentation just says start: time. I couldn’t figure out the exact type, so I tried a POSIX time. The time in the profile files don’t add up to the reported compute time, there is a lot of time missing. The total time in the profiles can either increase or decrease between each cycle, so it clearly doesn’t contain a global profile.

The performance_report function also seems to write files with a very limited scope. The job duration in the report is 20 minutes, while the job ran for around 21 hours.

My code uses multiple clients in different threads. The actual jobs run completely fine this way, but could it be problematic for the diagnostics?

That might explain some problems. Do you try to profile every clients, or to use the performance_report context manager for every threads?

Maybe we should try with a simpler use case, only one Client, and see if it works for more than one hour of data?

Every cycle, each Client saves its profile as a unique file. The performance_report is used as context in the main thread when it launches all the other threads and waits for them to finish. I will try the simpler case, like you suggest. In theory, should these diagnostics be saved for each thread or for only one thread?

I would say that it has to be saved for every Client, but I’m really not sure about it.

Maybe we should try with a simpler use case, only one Client, and see if it works for more than one hour of data?

I tried it. The task stream in the performance report is fine and spans all the runtime. However, the worker profile is missing a lot of compute time, and so are the Client.profile files when added together. It seems that this issue is more general that we thought.

Could you share a reproducer?

This code exhibits the problematic behavior: the profile in the dashboard has a window of only 1 hour. I updated all the dask-related packages just before running the script.

dask==2023.5.1
dask-jobqueue==0.8.1
import os
import glob
import time
import sys
import dask
import logging
import random

from dask.distributed import (
    Client,
    LocalCluster,
    get_client,
    as_completed,
    performance_report,
)

NUM_PROCESSES = 16

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-8s %(message)s',)


class DummyManager:
    def run(self):
        logging.info("Starting the manager")

        jobs = list(range(1, 50))

        client = get_client()
        futs = []
        for j in jobs:
            futs.append(client.submit(self.job, j))

        asc = as_completed(futs, with_results=True)
        for fut, ret in asc:
            logging.info(f"Processing future {str(fut)}: ret={str(ret)}")
            if ret > 0:
                logging.info(f"Launching a subjob with time {ret}")
                asc.add(client.submit(self.job, ret))
            fut.release()


    def job(self, n):
        time.sleep(60 * n)

        if random.random() > 0.8:
            return round(random.random() * 10)
        return 0


if __name__ == "__main__":
    cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=NUM_PROCESSES,
        processes=False,
    )

    client = Client(cluster)
    with performance_report(filename=f"dask-performance_{time.time():.0f}.html"):
        try:
            manager = DummyManager()
            manager.run()
        except KeyboardInterrupt:
            logging.info("Stopping the job...")
            cluster.close()
            exit(0)
    client.close()
    cluster.close()

Is the behavior reproducible? Newer versions of Dask seem to also have this same issue.

Hi @RaphaelRobidas, sorry for the delay.

Just tried your code.

Are you talking about the interactive Dashboard when the client is running, or the performance report profile?

In the performance report, this seems OK to me, what do you get and what would you expect?

Hello @guillaumeeb,

Here is the profile after around 1h of runtime with the code above:

The profile registers 15hr 58m of compute time, which makes sense for 100% usage of 16 threads. For some reason, Compute Time Per Task and Aggregate Time Per Action both show 9hr 21m at that moment (although that isn’t my main issue).

At the end of the script execution (after 98m, 38m later), here is the worker profile in the report:

It registers only 10hr 46m of compute time, which is less than after only one hour. However, it would fit with an average number of running jobs less than 16 in the last hour, as the task stream shows:

The run summary however registers a compute time of 20hr 55m, which seems about right:

Why isn’t this 20hr 55m of total compute time broken down in the profile? For longer/bigger runs, the discrepancy is even more severe. For example, I have a run which reports 54d 22hr of compute time, but registers only 12hr 32m in the profile. It seems to me like the profile only has a window of 1 hour and does not consider the data outside that window.

The profile in the interactive dashboard while the job is still running has the same behaviour.

Okay, I ran a simpler run with 96 fixed calculation durations of 15m. With 16 threads, we would expect this to take 90 minutes.

import os
import glob
import time
import sys
import dask
import logging
import random

from dask.distributed import (
    Client,
    LocalCluster,
    get_client,
    as_completed,
    performance_report,
)

NUM_PROCESSES = 16

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-8s %(message)s',)


class DummyManager:
    def run(self):
        logging.info("Starting the manager")

        jobs = list(range(1, 97))

        client = get_client()
        futs = []
        for j in jobs:
            futs.append(client.submit(self.job, j))

        asc = as_completed(futs, with_results=True)
        for fut, ret in asc:
            logging.info(f"Processing future {str(fut)}: ret={str(ret)}")
            if ret > 0:
                logging.info(f"Launching a subjob with time {ret}")
                asc.add(client.submit(self.job, ret))
            fut.release()


    def job(self, n):
        time.sleep(60 * 15)

        return 0


if __name__ == "__main__":
    cluster = LocalCluster(
        n_workers=1,
        threads_per_worker=NUM_PROCESSES,
        processes=False,
    )

    client = Client(cluster)
    with performance_report(filename=f"dask-performance_{time.time():.0f}.html"):
        try:
            manager = DummyManager()
            manager.run()
        except KeyboardInterrupt:
            logging.info("Stopping the job...")
            cluster.close()
            exit(0)
    client.close()
    cluster.close()

In the HTML report, the Worker profile still shows 15hr 59m as the total time, as before. However, the summary page reports a total runtime of 2hr 14m (instead of the expected 1hr 30m) and 35hr 44m of total compute time, which also isn’t the expected 24hr. The task stream mostly contains tasks running for 15m, but also one per thread which runs for the last 59m of the job (?). I’m not quite sure what is happening here.

Can anyone reproduce this?

Hi @RaphaelRobidas,

Sorry we didn’t answer earlier…

I just ran yor reproducer, and didn’t get the same result as you:

  • Summary page reports 90min runtime as expected, and 24hrs of compute time.
  • Task stream contains only 15min job, and span over the 90min of execution.
  • Worker profile shows 4h 25min, which I’m not sure of what it represents…

I’m running with Dask 2024.2.1.

Hello @guillaumeeb,

Thanks for taking the time to look into the issue once more.

I ran my reproducer again with Dask 2024.4.2 and got the same as you (4h 26min instead of 4h 25min). My issue is with this last point. Dask doesn’t seem to be keeping all the data. For example, here is the profile at one point during the run:


30 minutes later:

The previous data has been lost and is no longer in the timeline at the bottom. The total time is still 4h 26min.

In the end, I developed workarounds for my specific purposes, but this is definitely confusing when looking at Dask analytics.

I agree that the Worker Profile is a bit disturbing, maybe it would make sense to open an issue on distributed github about this.

1 Like

For posteriority, here’s the Github issue: Worker profile limited to a short timespan · Issue #8653 · dask/distributed · GitHub