Running this distance-function individually locally, is 1 magnitude faster if I set parallel=True
. But I place it inside a function which is submitted to a cluster of workers, (on this deployment), as seen below, and wonder if there is a way to take advantage of the nested functions parallelizability or not.
Timing the full embarrasingly parallel computation with the nested functions parallel-option set to True
or False
, does not seem to make any difference on the walltime. So I am guessing running a nested function in parallel does not make sense in this case, and I should use resources on parallelizing the enclosing function.
Before I conclude with that, I wanted to ask if someone know of an alternative I am not aware of, that takes advantage of the nested functions parallelizability? Also, what do others do in a similar situation?
Example is supposed to be reproducible
import numpy as np
import numpy.ma as ma
import math
from numba import njit, prange
@njit(fastmath=True, parallel=False)
def dist_loop_njit_fm_parallelFalse( lat0, lon0, lat, lon ):
distances = np.empty_like(lat)
R_earth = 6.371e6
phi1 = math.radians(lat0)
lambda1 = math.radians(lon0)
for i in prange(lat.size):
phi2 = math.radians(lat[i])
lambda2 = math.radians(lon[i])
a = math.sin((phi2 - phi1)/2.0)**2
b = math.cos(phi1) * math.cos(phi2) * math.sin((lambda1 - lambda2)/2.0)**2
distances[i] = R_earth * 2 * math.asin( math.sqrt(a+b) )
return distances
def function( lat0, lon0, lat_scattered, lon_scattered, window_size, ... ):
distance = dist_loop_njit_fm_parallelFalse( lat0, lon0, lat, lon )
in_window = np.where(distance < window_size)[0]
lat, lon = lat[in_window], lon[in_window]
# Rest of code...
return some_results
def run_function(client, window_size, workers = None ... ):
# Load data
n=2_000_000
hlat, llat = 90.0, -90.0
hlon, llon = 180.0, -180.0
lat = np.random.uniform(low=llat, high=hlat, size=(n,))
lon = np.random.uniform(low=llon, high=hlon, size=(n,))
# Scatters that data to cluster
lat_scattered = client.scatter(lat)
lon_scattered = client.scatter(lon)
# make iterable
latg = np.arange(-80,80, 5)
long = np.arange(-180,180, 5)
lonG, latG = np.meshgrid(long,latg)
lons = ma.MaskedArray(lonG).compressed()
lats = ma.MaskedArray(latG).compressed()
# Starts embarrasingly parallel computation
list_of_submitted_tasks = []
for lat0, lon0 in zip( lats, lons ):
submitted = client.submit(function,
lat0, lon0,
lat_scattered, lon_scattered,
window_size,
pure = False,
workers = workers
)
list_of_submitted_tasks.append( submitted )
return list_of_submitted_tasks
# Start client and cluster, and run
window_size = 100e3
results = client.gather(run_function(client, window_size, workers = None ))