Client Run Function Performance

Hi, team,

I use Dask as the distributed infrastructure to train AI model. I have the function to compute the result on the worker, and use the Dask client to trigger up the calculation as the follows.

from dask.distributed import get_client
cli = get_client(address = f"tcp://my_host:my_port")
cli.wait_for_workers(n_workers=my_worker_num)
my_result = cli.run(my_calculation_func, **kwargs, wait=True)

I am sure that my compute function (“my_calculation_func” in above code snippet) needs less than 1 second, however the “cli.run” function (“cli.run” in above code snippet) needs more than 10 seconds to finish when I use 30 workers. Moreover, more workers make the duration of “cli.run” function longer. Finally when I use 120 workers “cli.run” function needs more than 50 seconds, which is unacceptable to me.

My Dask version is 2022.2.0. I have tested “cli.submit(my_calculation_func, pure=False)” and it has poorer performance than “cli.run”. Meanwhile, each worker runs in the separate server, and there are little data transferred between the workers and client so my network has enough bandwidth.

My question, is the time cost of “run” function reasonable? Is there any way to optimize it to reduce the duration?

Thanks in advance.
Chris Ding

@cuauty Apologies for the delay in response! Would you be able to share a minimal, reproducible example, maybe with sleep to simulate work?

Something like:

from time import sleep
from dask.distributed import Client

client = Client(n_workers=30)


def my_1s_func(a=1):
    sleep(1)
    return a

%%time
res = client.submit(my_1s_func, pure=False)
res.result() 
# Wall time: 1.05 s

%%time
res1 =  client.run(my_1s_func, wait=True)
# Wall time: 1.05 s

I’m not able to reproduce it directly, so we may need more information about your dask cluster setup and my_calculation_func.