Parallelizing a for loop using dask delayed and appending the results

I have this toy example that I am using to solve my real problem -

import xarray as xr
import numpy as np
import dask
import datetime
import sys

def main():

    ret = setUpData()
    executeCalc(ret)
    
def setUpData():
    
    temperature = 273 + 20 * np.random.random([4,17,73,144])
    lat = np.linspace(-90.,90.,73)
    lon = np.linspace(0.,360. ,144,endpoint=False)
    pres = ["1000","925","850","700","600","500","400","300","250","200","150","100","70","50","30","20","10"]
    level = np.array(pres)
    level = level.astype(float)*100
    time = np.empty((4))
    time[0] = np.datetime64(datetime.datetime(2023,1,14,0))
    time[1] = np.datetime64(datetime.datetime(2023,1,14,6))
    time[2] = np.datetime64(datetime.datetime(2023,1,14,12))
    time[3] = np.datetime64(datetime.datetime(2023,1,14,18))
    temp = xr.DataArray(temperature,coords=[time,level,lat,lon],dims=['time', 'level','lat', 'lon'])
    ds = xr.Dataset(data_vars={'temperature':temp})
    ds.to_netcdf('testxarray.nc')
    ds1 = xr.open_dataset('testxarray.nc',chunks={'time':1})
    print(ds1)
    tmp = ds.temperature.values
    ret = []
    ret.append(lat)
    ret.append(lon)
    ret.append(level)
    ret.append(tmp)
    return ret

def executeCalc(ret):
    lat,lon,level,tmp = ret
    for i in range (0,tmp.shape[0]):
        tmpInstant = tmp[i,:,:,:]
        potemp = pot(tmpInstant,lat,lon,level)
        
def pot(tmp,lat,lon,pres):
    cp = 1004.0
    md = 28.9644
    R = 8314.41
    Rd = R/md
    nlat = len(lat)
    nlon = len(lon)
    nlevel = len(pres)
    potemp = np.zeros((nlevel,nlat,nlon))
    potemp = tmp * (100000./pres[:,None,None]) ** (Rd/cp)

    return potemp
    
main()

What I want to do is to run this for loop in parallel -

potempList = []
 for i in range (0,tmp.shape[0]):
        tmpInstant = tmp[i,:,:,:]
        potemp = pot(tmpInstant,lat,lon,level)
        potempList.append(potemp)

and append the results of potemp , I am aware that I need to use dask delayed somewhere in this but to wrap this function call with dask delayed is something I am not sure at all. I have 4 CPU nodes on this test box and the serial execution takes around 100 seconds. I am hoping to reduce it to 25 seconds.

Hi @winash12, welcome here!

First, thanks so much for giving a toy example of your problem, this is really appreciated.

Unfortunately, there might be a few mistakes in it. Even putting the code you want to run in parallel inside the executeCalc function, the example runs in only a few milliseconds on my laptop, so this is hard to check if a parallel version will be faster. Do you have values to make it run a bit longer?

Anyway, the solution should be something like:

import dask
from dask import delayed

def executeCalc(ret):
    lat,lon,level,tmp = ret
    potempList = []
    for i in range (0,tmp.shape[0]):
        tmpInstant = tmp[i,:,:,:]
        potempList.append(delayed(pot)(tmpInstant,lat,lon,level))
    results = dask.compute(potempList)

By default, Delayed API will use a threaded scheduler, this might not be efficient in your workflow (GIL holding or things like that). In this case, you might want to look at using a LocalCluster. See

https://docs.dask.org/en/stable/scheduling.html#dask-distributed-local

@guillaumeeb Wow thanks for the response ! I was told that my example was too big for this site and I had sent a note to the admin to delete the question and had not yet received the response.

Regarding the point you raise - very well valid. I had just came up with that example because I wanted to show I had a problem. In reality the 4 dim array could be something like this

temperature = 273 + 20 * np.random.random([120,35,722,1044]).

I will try the code sample you sent me and I will be sure to get respond back to your post !

@guillaumeeb This is the modified code based on your recommendations. Am I doing it in the proper way ? This runs fine but the serial version excecution seems faster than this. This took 1 second while the serial algorithm took 0.0024 seconds. You mentioned you ran yours in a few milliseconds have I gone wrong somewhere ?

import xarray as xr
import numpy as np
import dask
from dask import delayed
from dask.distributed import Client
import datetime
import time
def main():
    
    if __name__ == "__main__":

        client = Client()
        tmp,pres = setUpData()
        startTime = time.time()
        executeCalc(tmp,17,73,144,pres)
        stopTime = time.time()
        print(stopTime-startTime)
    
def setUpData():
    
    temperature = 273 + 20 * np.random.random([4,17,73,144])
    lat = np.linspace(-90.,90.,73)
    lon = np.linspace(0.,360. ,144,endpoint=False)
    pres = ["1000","925","850","700","600","500","400","300","250","200","150","100","70","50","30","20","10"]
    level = np.array(pres)
    level = level.astype(float)*100
    time = np.empty((4))
    time[0] = np.datetime64(datetime.datetime(2023,1,14,0))
    time[1] = np.datetime64(datetime.datetime(2023,1,14,6))
    time[2] = np.datetime64(datetime.datetime(2023,1,14,12))
    time[3] = np.datetime64(datetime.datetime(2023,1,14,18))
    temp = xr.DataArray(temperature,coords=[time,level,lat,lon],dims=['time', 'level','lat', 'lon'])
    ds = xr.Dataset(data_vars={'temperature':temp})
    ds.to_netcdf('testxarray.nc')
    ds1 = xr.open_dataset('testxarray.nc',chunks={'time':1})
    tmp = ds.temperature.values
    return tmp,level

def executeCalc(tmp,lenlevel,lenlat,lenlon,pres):
    potempList = []
    for i in range (0,tmp.shape[0]):
        tmpInstant = tmp[i,:,:,:]
        potempList.append(delayed(pot)(tmpInstant,lenlevel,lenlat,lenlon,pres))
        
    results = dask.compute(potempList,scheduler='processes',num_workers=4)
                          
def pot(tmp,nlev,lat,lon,pres):
    potemp = np.zeros((17,73,144))
    potemp = tmp * (100000./pres[:,None,None])
    return potemp
    
main()

Two things come to my mind:

  1. An algorithm that take 0.0024s in serial most probably don’t need to be parallelized. Dask won’t be efficient at this scale. You want to try with some bigger data, so that the duration in serial is at least a few seconds. In your current test it takes more time to start 4 Dask worker processes than to compute anything.
  2. Previously, I only took a look at the section you wanted to parallelize, but did not take the time to look at the example as a whole. I think in your case you’ll want to use Dask Arrays directly and not delayed. I will take a deeper look at it.

Okay, so I’ve taken out some extraneous things in you code, made the data a bit bigger, and come up with this:

import numpy as np
import dask
import dask.array as da
import datetime
import sys

def pot(tmp,lat,lon,pres):
    cp = 1004.0
    md = 28.9644
    R = 8314.41
    Rd = R/md
    nlat = len(lat)
    nlon = len(lon)
    nlevel = len(pres)
    potemp = tmp * (100000./pres[:,None,None]) ** (Rd/cp)

    return potemp

lat = np.linspace(-90.,90.,1444)
lon = np.linspace(0.,360. ,2088,endpoint=False)
pres = ["1000","925","850","700","600","500","400","300","250","200","150","100","70","50","30","20","10"]
level = np.array(pres)
level = level.astype(float)*100

Above is the common part to Dask and Numpy. Then, Numpy computation:

temperature_np = 273 + 20 * np.random.random([4,17,1444,2088])
potempList = []
for i in range (0,temperature_np.shape[0]):
    tmpInstant = temperature_np[i,:,:,:]
    potemp = pot(tmpInstant,lat,lon,level)
    potempList.append(potemp)

Dask equivalent computation:

temperature = 273 + 20 * da.random.random([4,17,1444,2088], chunks=(1,17,1444,2088))
result = temperature.map_blocks(pot,lat,lon,level)
result_np = result.compute()

You can also try Dask computation with a LocalCluster, just put this code before:

from dask.distributed import Client
client = Client(n_workers=4)

However, I did not achieved any speedup using Dask, and I see two main reasons:

  1. The scale is still two small, it takes about 5 seconds for Numpy or Dask to compute your code.
  2. During that time, when looking at Dask Dashboard, we do see 4 computations in parallel, which last about 1.5s. Bu then you’ve got the time to send the computation to Dask, and more importantly the time to get back the result which means serializing 1.5GB of data from Worker to client.

So the question I have is what you want to do next with the computed values? Do you want to write it somewhere? If we could avoid the point 2, Dask would be more interesting.

For point 1, we should try with a real world example you mentioned.

@guillaumeeb Wow thanks for the long detailed response. Much appreciated from me !

I have actually simplified the algorithm to show the group here how to parallelize my algorithm.

The algorithm actually consists of 4 steps

  1. Do a linear interpolation from temperature to pressure which uses the pot function)
  2. Do another linear interpolation for 4 d velocities(time, level, lat,lon)
  3. Use interpolated velocities and pressures to calculate a quantity called vorticity.(Another 4 d array)

The above 3 steps happen in 3 different functions and which is then synthesized inside an outer function call.
The entire algorithm in theory can be parallelized including the calculation to pot, linear interpolation and the calculation of vorticity. Since I have 24 time steps in a day and all time steps are independent they can be sent to 4 different CPU nodes.

So far I have not yet thought of writing to disk but it’s very much possible with massive meteorological data sets. When I have to deal with climate data for 40 years and we need the vorticity anomaly for a particular day. At the present time I am only calculating vorticity for a single time instant.

If you do things like that, the computation will be much longer on each temporal step, so as you say below you’ll benefit a lot from parallelizing the algorithm. Just add a time.sleep(2) in the pot function and you’ll see that Dask is helping a lot.

By the way, this made me notice that there was some missing kwarg in my Dask version, especially important when dealing with a dask array which has a few chunk:

temperature = 273 + 20 * da.random.random([4,17,1444,2088], chunks=(1,17,1444,2088))
result = temperature.map_blocks(pot,lat,lon,level,meta=np.array(()))
result_np = result.compute()

Without the meta kwarg, Dask will try to infer the output type by calling the method twice before submitting the graph.

This is what’s important, in this case, Dask will help you with one API or the other.

With Dask, and in parallelized or distributed computing in general, you need to think about how you read data from disk (don’t read on client side and send data to the worker); how your code is parallelized/distributed (do you split the data, do you split the code, how to split the data); and how you write the outputs (same as reading, write directly from workers).