Why does submit overhead increase exponentially for Fibonacci example?

I’m trying to write a how to guide for Panel + Dask. I figured I would build on the Async Web Server Example of Dask which submits a fibonacci function asynchronously.

From the Dask efficiency docs I would expect the overhead to be around 1msec.

What I experience in practice is ~7msec and unfortunately when n>=32 it starts increasing a lot.

Why does this happen? Is it as expected? How do I improve this situation?

cluster.py

# cluster.py
from dask.distributed import LocalCluster

DASK_SCHEDULER_PORT = 64719
DASK_SCHEDULER_ADDRESS = f"tcp://127.0.0.1:{DASK_SCHEDULER_PORT}"

if __name__ == '__main__':
    cluster = LocalCluster(scheduler_port=DASK_SCHEDULER_PORT, n_workers=4)
    print(cluster.scheduler_address)
    input()

tasks.py

import numpy as np

def _fib(n):
    if n < 2:
        return n
    else:
        return _fib(n - 1) + _fib(n - 2)

def func(value):
    return _fib(value)

value = 36

app.py

import asyncio
import time
import panel as pn
import numpy as np
from dask.distributed import Client
from cluster import DASK_SCHEDULER_ADDRESS
import tasks
import hvplot.pandas
import pandas as pd

pn.extension("terminal", sizing_mode="stretch_width", template="fast")

results = [

]
N_MIN = 0
N_MAX = 40

submit_button = pn.widgets.Button(name="Submit")
overhead_plot = pn.pane.HoloViews(height=400)
duration_plot = pn.pane.HoloViews(height=400)

def update_results(n, local_duration, cluster_duration):
    results.append(dict(n=n, local_duration=local_duration, cluster_duration=cluster_duration, overhead=cluster_duration-local_duration))

    df = pd.DataFrame(results).groupby("n").mean()
    
    overhead_plot.object = df.hvplot.line(y="overhead")
    duration_plot.object = df.hvplot.line(y=["local_duration", "cluster_duration"], ylabel="duration", xlim=(N_MIN, N_MAX))


async def get_client():
    if not "dask-client" in pn.state.cache:
        pn.state.cache["dask-client"] = await Client(
            DASK_SCHEDULER_ADDRESS, asynchronous=True
        )
    return pn.state.cache["dask-client"]

@pn.depends(submit_button, watch=True)
async def _click(_):
    submit_button.disabled=True
    results = []
    
    for n in range(N_MIN, N_MAX):
        start = time.time()
        tasks.func(n)
        local_duration = time.time()-start

        client = await get_client()
        start = time.time()
        await client.submit(tasks.func, n)
        cluster_duration = time.time()-start
        
        update_results(n, local_duration, cluster_duration)
        
    submit_button.disabled=False

component = pn.Column(
    submit_button, overhead_plot, duration_plot
).servable()
python cluster.py
panel serve app.py

Panel Issue

I’ve created an issue with Panel because maybe the issue is on the Panel side. See Issue #4239 · holoviz/panel (github.com)

Non Recursive Function

I don’t see the same explosion in overhead if I change the func submitted to

def func(value):
    return time.sleep(float(value)/10)

Simliarly if I change the func to

def func(value):
    x = y = int(10000**(value/40))
    return np.random.random((x,y)).shape

I don’t see the same explosion.

Pure-python functions that don’t release the GIL for tens of seconds don’t perform well on dask, because the user function runs on 1 thread and the network comms to the worker run on another.
Hog the GIL long enough and you’ll get the scheduler to think that the worker died, and it will restart the work somewhere else.

1 Like

You can change the executor on a worker from thread pool to process pool to fix the issue. Of course, you’ll pay the overhead for IPC comms, which for very short tasks is going to be very large.

1 Like