Latency between graph constitution and start of calculation

Hi,
Happy to belong to the dask community.
I’m worried about the latency before starting computation, when I face a great number of tasks :
Here is a small example :

import dask.array as da
from datetime import datetime as dtt
def dt_log(text='', precision = 4, start=None):
    now = dtt.now() 
    dur = str(now-start).split('.')[0]+str(round(int((now-start).microseconds)/1e6,precision))[1:] if start else None
    print( f"{now.isoformat(sep=' ', timespec='milliseconds')}({dur if start else ''}) - {text}", flush=True)
    return now
size = 1_000_000
dt_log("Zero..")
Zero = da.asarray([0.]*size, chunks=size//1_000)
dt_log("One..")
One = da.asarray([1./size]*size, chunks=size//1_000)

if __name__ == '__main__':
    from dask.distributed import Client
    import dask
    dask.config.set({"array.slicing.split_large_chunks": True,
                     "scheduler":'threads'}) 
    client = Client(processes=False, silence_logs='error')
    print(client, client.dashboard_link, sep='\n')
    Fibo = [Zero, One]

    N = 100
    for i in range(2,N+1):
        Fibo += [Fibo[-2]+Fibo[-1]]
    Fibo_sum = [x.sum() for x in Fibo]
    st = dt_log("calculation..")
    Fibo_sums = da.stack(Fibo_sum)
    F = Fibo_sums.compute()
    dt_log("..calculation" , start=st)

    Result1 = [round(x) for x in list(F)]
    print(f"Fibo({N}): {Result1}")
    client.close()
    client.cluster.close()

There is Latency of about 30 sec before calculation starts. This Latency increases with the number of tasks. So much that for real cases, it never starts.

I also get sometimes this error message :

2023-01-17 07:54:43.409() - Zero..
2023-01-17 07:54:43.476() - Un..
<Client: 'inproc://192.168.8.75/15688/1' processes=1 threads=20, memory=31.69 GiB>
http://192.168.8.75:8787/status
2023-01-17 07:54:44.125() - calculation..
2023-01-17 07:54:55,303 - tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x00000290E9778F70>>
Traceback (most recent call last):
  File "C:\Python310\lib\site-packages\tornado\ioloop.py", line 921, in _run
    val = self.callback()
  File "C:\Users\flho\AppData\Roaming\Python\Python310\site-packages\bokeh\server\tornado.py", line 760, in _keep_alive
    c.send_ping()
  File "C:\Users\flho\AppData\Roaming\Python\Python310\site-packages\bokeh\server\connection.py", line 93, in send_ping
    self._socket.ping(str(self._ping_count).encode("utf-8"))
  File "C:\Python310\lib\site-packages\tornado\websocket.py", line 444, in ping
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
2023-01-17 07:56:45.710(0:02:01.5849) - ..calculation
Fibo(100): [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578, 5702887, 9227465, 14930352, 24157817, 39088169, 63245986, 102334155, 165580141, 267914296, 433494437, 701408733, 1134903170, 1836311903, 2971215073, 4807526976, 7778742049, 12586269025, 20365011074, 32951280099, 53316291173, 86267571272, 139583862445, 225851433717, 365435296162, 591286729879, 956722026041, 1548008755920, 2504730781961, 4052739537881, 6557470319842, 10610209857723, 17167680177565, 27777890035288, 44945570212853, 72723460248141, 117669030460994, 190392490709135, 308061521170129, 498454011879264, 806515533049393, 1304969544928657, 2111485077978050, 3416454622906707, 5527939700884756, 8944394323791461, 14472334024676212, 23416728348467672, 37889062373143904, 61305790721611584, 99194853094755456, 160500643816367040, 259695496911122560, 420196140727489344, 679891637638612224, 1100087778366101504, 1779979416004713728, 2880067194370815488, 4660046610375530496, 7540113804746345472, 12200160415121876992, 19740274219868217344, 31940434634990112768, 51680708854858301440, 83621143489848410112, 135301852344706727936, 218922995834555072512, 354224848179261931520]

ERROR:tornado.application:Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x00000290E9778F70>>
Traceback (most recent call last):
  File "C:\Python310\lib\site-packages\tornado\ioloop.py", line 921, in _run
    val = self.callback()
  File "C:\Users\flho\AppData\Roaming\Python\Python310\site-packages\bokeh\server\tornado.py", line 760, in _keep_alive
    c.send_ping()
  File "C:\Users\flho\AppData\Roaming\Python\Python310\site-packages\bokeh\server\connection.py", line 93, in send_ping
    self._socket.ping(str(self._ping_count).encode("utf-8"))
  File "C:\Python310\lib\site-packages\tornado\websocket.py", line 444, in ping
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError

Do you know why this latency and if it can be bypassed ?
I work on Windows 10 and dask ‘2023.1.0’ (got also the pb on ‘2022.12’)

Thanks so much for your help !

Hi @Francois,

It’s a bit hard when just reading your example to estimate the number of tasks you get in the generated graph.

In general, Dask begin to have a hard time with graphs wit more than 100_000 or 1_000_000 tasks. In this case, when working with Dask Arrays, the best thing to do is to try to increase the chunk size to have less chunks.

Hi Guillaume,

Thanks a lot for your answer ! I’ll check the number of tasks.
The question I have is why nothing (seems to) happen during this time lapse ?
Unfortunately, I can’t increase the size of chunks… I’ll have to serialise manually my calculations … which is what I wanted to avoid

There is about 235 000 tasks indeed

During that time, Scheduler is heavily working receiving and then analyzing this graph. Unfortunately I can’t say a lot more as I’m not an expert in Scheduler mechanisms. I agree that it’s a problem that no feedback of what is happening behind the scene is given to the end user…

What I know is that there are several optimizations that occurs or can be activated, maybe one of those would be of some help in your case?

Unfortunately yes, this is the other solution, just break you for loop into several pieces.

I imagine the example above is not your real use case ;), could you give something closer to what you’re trying to accomplish? Why can’t you increase the chunks size?