Question about submitting lazy function with client.submit()

Hi, I’m trying to use client.submit() to submit lazy function to distributed workers, but have a question. I post the following example, that computes eigen decomposition with parallel workers:

def eigh(A:da.Array, n=None):
    #just sleep for 1 sec
    time.sleep(1)
    e, u = da.apply_gufunc(xp.linalg.eigh, "(i,j)->(i),(i,j)", A, allow_rechunk=True)

    if n != None:
        e = da.rechunk(e, chunks=n)
        u = da.rechunk(u, chunks=(n,n))

    return persist(e, u)
    #return e, u

if __name__ == "__main__":
    from dask_mpi import initialize
    initialize(....)
    time.sleep(10)
    client = Client()

    m = 16
    N = 10000
    n = N//2
    rs = da.random.RandomState(1234, xp.random.RandomState)

    #generate 16 hermitian matrices and persist them to memory
    A = []
    for i in range(m):
        a = rs.standard_normal(size=(N,N), chunks=(n,n)) + 1j*rs.standard_normal(size=(N,N), chunks=(n,n))
        a = da.conj(a.T) @ a
        a = a.persist()
        A.append(a)
    wait(A)

    #submit the eigen decomposition function to workers and wait the futures until finished
    futures = []
    t0 = time.time()
    for a in A:
        futures.append(client.submit(eigh, a, n))
    wait(futures)
    t1 = time.time()
    print("submit time= {:.2f}".format(t1-t0))
    
    #gather results from workers
    t0 = time.time()
    results = client.gather(futures)
    wait(results)
    t1 = time.time()
    print("gather results time= {:.2f}".format(t1-t0))

Is it the only way to compute the eigen decomposition immediately by returning persist() function in eigh() ? I have tried returning without persist(), and the workers didn’t do any computation at all.

If you want the result from a Dask collection computation, you’ve got to call compute() on it, either it will stay as a graph state.

Your workflow is a bit complicated in the sense that you are using Dask collections inside Future API calls, so you have two levels of parallelism. So depending on the collection size, it might or might not be OK to call compute on Worker side.

Thank you. Since in my work flow, I have to compute eigen decomposition of multiple independent matrices, I want to do it in a more efficient way.
I also tried using the simplest way as following:

    es = []
    us = []
    for a in A:
        e, u = eigh(a, n)
        es.append(e)
        us.append(u)
    es, us = persist(es, us)
    wait([es,us])

I have tried computing eigen decomposition of 32 10^5*10^5 hermitian matrices with 4, 8, 16, 32 gpu workers, and compared with the case which using client.submit().
In the case of using client.submit(), the computation time is bout 69~77s for 4 workers, 38~40s for 8 workers, 23~24s for 16 workers, 14~15s for 32 workers.
In the case of the simplest way, the computation time is bout 73~80s for 4workers, 42~45s for 8workers, 39~43s for 16 workers, 41~45 for 32 workers.
Obviously, the former case is strong scaling. In contrast to the former case, the latter case is strong scaling up to 8 workers.
I want to know what’s wrong with the above code, or is there any better way to let the computation be parallel?

I’m not sure if there is something wrong in your code. You are just using a different eigh function (what is xp library?).

Actually, I think you shouldn’t have to use Client.submit at all. If in your second snippet you use the same eigh function, I’m not sure why this is slower. Could you put together a complete reproducible example?