Computing bandwidth between pairs of workers

I’m seeking for some advice on how to recompute reported bandwidths between pairs of workers in the performance report. What I would like to see are all the transfers accounted for in their computations. For now I don’t see how to derive such metrics from the information reported in workers’ log.

Hi @orliac, welcome to Dask community!

Cloud you be more precise on the information you want to retrieve? Is it something that you can see on the Dashboard but are unable to get by yourself, and if so which part?

Workers’ log don’t give much information about this, however, you can find plenty of statistics through diverse means:

I can also give some simple code sample:

from distributed import Client

# create client
client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GiB')

workers = client.cluster.scheduler.workers.values()
for ws in workers:
    print(ws.metrics['transfer'])

gives:

{'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}
{'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}

Thank you for the answer and sorry for the slow reply.
Here is the output from a dummy example that processes 2 chunks over 2 workers from Dask distributed. From with get_task_stream() I can see the two transfers with similar timings as reported in the task stream.
T duration = 7.253170 ms, nbytes = 16 => bandwidth = 2205.932023 B/s tcp://10.91.54.83:43297 → tcp://10.91.54.84:42145
T duration = 6.340742 ms, nbytes = 16 => bandwidth = 2523.363941 B/s tcp://10.91.54.83:43297 → tcp://10.91.54.84:42145
But computing the bandwidths with the nbytes reported do not correspond at all with the one reported in the “Bandwith Workers” tab which says 2.24 GiB/s while from the numbers above you get something around 2.2-5 kB/s.

Could you provide a MVCE?

Something possible: transfer time takes into account also obect serialization, not only pure bandwidth. The volumes here are quite small so maybe not really representative too.

Please consider the code below as a simple example. I run it on a cluster with 2 mono-threaded workers spread on two different nodes.

import numpy as np
import dask.array as da
from dask.distributed import Client, LocalCluster
from dask.distributed import performance_report
from dask.distributed import get_task_stream
import time
import argparse

B2GiB = 1 / 2**30

if __name__ == "__main__":

    p = argparse.ArgumentParser()
    p.add_argument("--scheduler-file", type=str, required=True, help="Path to dask scheduler.json written by dask-scheduler")
    args = p.parse_args()

    client = Client(scheduler_file=args.scheduler_file)
    print("Dashboard:", client.dashboard_link)
    
    waddrs = list(client.scheduler_info()["workers"])
    w0, w1 = waddrs[0], waddrs[1]
    
    size  = (100, 100)
    a = da.from_array(np.full(size, 1.0), chunks=size)
    b = da.from_array(np.full(size, 2.0), chunks=size)
    
    A = client.persist(a, workers=w0, allow_other_workers=False)
    B = client.persist(b, workers=w1, allow_other_workers=False)
    
    with performance_report(filename="dr.html"), get_task_stream(client, plot='save', filename="ts.html") as ts:
        s = (A + B).compute()

    print("Result:", s)

    ts_data = ts.data
    #print(ts_data)
    print("------------------------------------------------------------")
    for el in ts_data:
        for ss in el["startstops"]:
            if ss["action"] == 'transfer':
                duration = ss["stop"] - ss ["start"]
                bw = el["nbytes"] / duration
                print(f" T duration = {duration * 1000:3f} ms, nbytes = {el['nbytes']} => bandwidth = {bw:3f} B/s {bw*B2GiB} GiB/s, {ss['source']} -> {el['worker']}")

With respect to the transfer timings that include (de)serialization times, is there an easy way to access them?

So, you are right, the bandwidth metric is computed with other information than task status. I guess you could get this information somehow, I tracked it down to distributed/distributed/scheduler.py at main · dask/distributed · GitHub and distributed/distributed/worker.py at main · dask/distributed · GitHub .

In the example above, at least par of this metric end up in ws.metrics[‘bandwidth’].

The real question is what do you really need, and what for. With your example, this is probably latency that takes the most part (not even serialization as I was guessing previously). If you increase data size, your way of computing bandwidth and the information from Dashboard becomes closer and closer.

Thank you, I’ll check the two code parts you pointed.
I’m after recovering the bandwidths manually because I found the “Bandwidth workers” information a bit misleading as bandwidths are obviously not computed over the runtime of a job but over transfers, for which we don’t have individual durations, amounts of bytes transferred and de/serialization times.
So sometimes you have jobs running for tens of seconds or several minutes with reported bandwidths in the range of GB/s giving the impression that huge amounts of data were transferred between workers while this is not the case.
Ideally, as a starting point, it would be nice if the amount of data actually transferred was also reported along the bandwidths.