Hi, I’m trying to use client.submit() to submit lazy function to distributed workers, but have a question. I post the following example, that computes eigen decomposition with parallel workers:
def eigh(A:da.Array, n=None):
#just sleep for 1 sec
time.sleep(1)
e, u = da.apply_gufunc(xp.linalg.eigh, "(i,j)->(i),(i,j)", A, allow_rechunk=True)
if n != None:
e = da.rechunk(e, chunks=n)
u = da.rechunk(u, chunks=(n,n))
return persist(e, u)
#return e, u
if __name__ == "__main__":
from dask_mpi import initialize
initialize(....)
time.sleep(10)
client = Client()
m = 16
N = 10000
n = N//2
rs = da.random.RandomState(1234, xp.random.RandomState)
#generate 16 hermitian matrices and persist them to memory
A = []
for i in range(m):
a = rs.standard_normal(size=(N,N), chunks=(n,n)) + 1j*rs.standard_normal(size=(N,N), chunks=(n,n))
a = da.conj(a.T) @ a
a = a.persist()
A.append(a)
wait(A)
#submit the eigen decomposition function to workers and wait the futures until finished
futures = []
t0 = time.time()
for a in A:
futures.append(client.submit(eigh, a, n))
wait(futures)
t1 = time.time()
print("submit time= {:.2f}".format(t1-t0))
#gather results from workers
t0 = time.time()
results = client.gather(futures)
wait(results)
t1 = time.time()
print("gather results time= {:.2f}".format(t1-t0))
Is it the only way to compute the eigen decomposition immediately by returning persist() function in eigh() ? I have tried returning without persist(), and the workers didn’t do any computation at all.