Is this a fair benchmarking approach?

Is the below code a fair way to benchmark Dask arrays? I want to fully separate the loading from disk time from the groupby and associated subtraction compute time. I’m starting a 4 node cluster with dask-ssh and I’ve noticed that when I specify num_workers per node as 1 that the reported load time is 1450s whereas with 2 workers per node it is only 191. First off, why might that be the case and second, what optimizations is Dask doing to load from netcdf4 files quickly? The compute time is again quite high (1500s) for 1 worker per node but only 200s for 2 workers per node. The data size is 4000x1800x3600 int16’s.

thanks

import xarray
import numpy as np
import pandas as pd
from time import perf_counter as timer
from dask.distributed import wait
from distributed import Client

client = Client("127.0.0.1:8786")

t1 = timer()
ds = xarray.open_dataset("adaptor.mars.internal-1648690136.8168552-21666-2-acc864a1-dd2f-490c-9157-a7f240c73b20.nc", chunks={})
a = ds.isel(time=slice(0,4000))
da = a.t2m
t2 = timer()
wait(da.data.persist())
t3 = timer()
print("load time:", t2 - t1)
print("sync time:", t3 - t2)
print("total load time:", t3 - t1)

start_time = timer()
gb = da.groupby("time.dayofweek")
gbmean = gb.mean("time")
final = gb - gbmean
wait(final.data.persist())
total_time = timer() - start_time

print("total time", total_time)

@DrTodd13 Welcome to Discourse! Interesting question!

I’m starting a 4 node cluster with dask-ssh and I’ve noticed that when I specify num_workers per node as 1 that the reported load time is 1450s whereas with 2 workers per node it is only 191. First off, why might that be the case and second, what optimizations is Dask doing to load from netcdf4 files quickly?

This does seem odd. Would you be able to share a minimal dataset? I’d be happy to reproduce this locally to see what’s going on. Especially because I’m currently not sure how the data is getting chunked with the chunks={} parameter.

In general, your workflow of using persist is a good way to benchmark. :smile:

Some notes:

  • In the first section, the result of da.data.persist() isn’t being captured, which might result in the data being loaded again in the next section when you do da.groupby.
  • I’d like to confirm if the “load time”: t2 - t1 is tracking the time to load metadata? Because I’d assume the actual data won’t be loaded until you call persist?

My recommendation would be to create a random netcdf4 file of dimension (30000,1801,3600) with datatype int16 and save that and then try my script above to load it and select only the first 4000 rows in the 0th dimension. I don’t know if there’s any consistency in this behavior starting from a smaller dataset or selecting more/less rows so trying to get my dataset to you would probably be a huge pain.

I already knew that “t2-t1” was only really going to be metadata load time and that is what I see. What I labelled “sync time” is where a long time is taken to read the file.

Thanks for the reply!

1 Like

I also just realized that the machines I’m using are huge servers with 128GB of memory. Given that, I’m not sure how to suggest you try to replicate the behavior. Any thoughts?

@DrTodd13 Thanks for getting back to us!

I also just realized that the machines I’m using are huge servers with 128GB of memory. Given that, I’m not sure how to suggest you try to replicate the behavior. Any thoughts?

I was thinking of scaling everything down (data and memory) to check if we can reproduce it, or at least get a sense of what’s going on.

My recommendation would be to create a random netcdf4 file of dimension (30000,1801,3600) with datatype int16 and save that

I’m not familiar with netcdf, could you please share the code to generate this?

The compute time is again quite high (1500s) for 1 worker per node but only 200s for 2 workers per node.

I’d also recommend looking at the diagnostic dashboard to see how the compute changes in both cases – it might help us understand why it’s so different.