How can I take advantage of a nested function being parallelizable, enclosed in an already embarrasingly parallelized computation on a cluster?

Running this distance-function individually locally, is 1 magnitude faster if I set parallel=True. But I place it inside a function which is submitted to a cluster of workers, (on this deployment), as seen below, and wonder if there is a way to take advantage of the nested functions parallelizability or not.

Timing the full embarrasingly parallel computation with the nested functions parallel-option set to True or False, does not seem to make any difference on the walltime. So I am guessing running a nested function in parallel does not make sense in this case, and I should use resources on parallelizing the enclosing function.

Before I conclude with that, I wanted to ask if someone know of an alternative I am not aware of, that takes advantage of the nested functions parallelizability? Also, what do others do in a similar situation?

Example is supposed to be reproducible
import numpy as np
import numpy.ma as ma
import math
from numba import njit, prange

@njit(fastmath=True, parallel=False)
def dist_loop_njit_fm_parallelFalse( lat0, lon0, lat, lon ):
    
    distances = np.empty_like(lat)
    R_earth = 6.371e6
    phi1 = math.radians(lat0)
    lambda1 = math.radians(lon0)
    for i in prange(lat.size):
        phi2 = math.radians(lat[i])
        lambda2 = math.radians(lon[i])
        a = math.sin((phi2 - phi1)/2.0)**2
        b = math.cos(phi1) * math.cos(phi2) * math.sin((lambda1 - lambda2)/2.0)**2
        distances[i] = R_earth * 2 * math.asin( math.sqrt(a+b) )
        
    return distances


def function( lat0, lon0, lat_scattered, lon_scattered, window_size, ...  ):
    
    distance =  dist_loop_njit_fm_parallelFalse( lat0, lon0, lat, lon )
    in_window = np.where(distance < window_size)[0]
    lat, lon = lat[in_window], lon[in_window]

    # Rest of code...

    return some_results


def run_function(client, window_size, workers = None ... ):

    # Load data
    n=2_000_000
    hlat, llat = 90.0, -90.0
    hlon, llon = 180.0, -180.0
    lat = np.random.uniform(low=llat, high=hlat, size=(n,))
    lon = np.random.uniform(low=llon, high=hlon, size=(n,))
    # Scatters that data to cluster
    lat_scattered = client.scatter(lat)
    lon_scattered = client.scatter(lon)
    
    # make iterable
    latg = np.arange(-80,80, 5)
    long = np.arange(-180,180, 5)
    lonG, latG = np.meshgrid(long,latg)
    lons = ma.MaskedArray(lonG).compressed()
    lats = ma.MaskedArray(latG).compressed()

    # Starts embarrasingly parallel computation
    list_of_submitted_tasks = []
    for lat0, lon0 in zip( lats, lons ):
        submitted = client.submit(function, 
            lat0, lon0,
            lat_scattered, lon_scattered,
            window_size,
            pure = False,
            workers = workers
        )
        list_of_submitted_tasks.append( submitted )

    return list_of_submitted_tasks

# Start client and cluster, and run
window_size = 100e3
results = client.gather(run_function(client, window_size, workers = None ))

The thing is to have a balance between the high level parallelization performed by Dask, and the lower level in your function. If you run thousands of calls of this function inside a Dask cluster with one undred cores, there is no need to use nested function parallelization. You will already use all your resources with the number of calls to your function.

Moreover, parallelization inside a function is often not completly optimal: you rarely divide the time of execution by 4 by using 4 threads or processes. So for large scale processing, launching a lot of time the same function, it’s often better to just use one core per call.

This is probably because you already use all the cores on the underlying infrastructure when not parallelizing because of the number of tasks you launch. Parallelizing inside the function could even be worse in this case.

1 Like

This makes things more clear for me, thanks alot @guillaumeeb ! :+1:

1 Like