I’m trying to write a how to guide for Panel + Dask. I figured I would build on the Async Web Server Example of Dask which submits a fibonacci
function asynchronously.
From the Dask efficiency docs I would expect the overhead to be around 1msec.
What I experience in practice is ~7msec and unfortunately when n>=32
it starts increasing a lot.
Why does this happen? Is it as expected? How do I improve this situation?
cluster.py
# cluster.py
from dask.distributed import LocalCluster
DASK_SCHEDULER_PORT = 64719
DASK_SCHEDULER_ADDRESS = f"tcp://127.0.0.1:{DASK_SCHEDULER_PORT}"
if __name__ == '__main__':
cluster = LocalCluster(scheduler_port=DASK_SCHEDULER_PORT, n_workers=4)
print(cluster.scheduler_address)
input()
tasks.py
import numpy as np
def _fib(n):
if n < 2:
return n
else:
return _fib(n - 1) + _fib(n - 2)
def func(value):
return _fib(value)
value = 36
app.py
import asyncio
import time
import panel as pn
import numpy as np
from dask.distributed import Client
from cluster import DASK_SCHEDULER_ADDRESS
import tasks
import hvplot.pandas
import pandas as pd
pn.extension("terminal", sizing_mode="stretch_width", template="fast")
results = [
]
N_MIN = 0
N_MAX = 40
submit_button = pn.widgets.Button(name="Submit")
overhead_plot = pn.pane.HoloViews(height=400)
duration_plot = pn.pane.HoloViews(height=400)
def update_results(n, local_duration, cluster_duration):
results.append(dict(n=n, local_duration=local_duration, cluster_duration=cluster_duration, overhead=cluster_duration-local_duration))
df = pd.DataFrame(results).groupby("n").mean()
overhead_plot.object = df.hvplot.line(y="overhead")
duration_plot.object = df.hvplot.line(y=["local_duration", "cluster_duration"], ylabel="duration", xlim=(N_MIN, N_MAX))
async def get_client():
if not "dask-client" in pn.state.cache:
pn.state.cache["dask-client"] = await Client(
DASK_SCHEDULER_ADDRESS, asynchronous=True
)
return pn.state.cache["dask-client"]
@pn.depends(submit_button, watch=True)
async def _click(_):
submit_button.disabled=True
results = []
for n in range(N_MIN, N_MAX):
start = time.time()
tasks.func(n)
local_duration = time.time()-start
client = await get_client()
start = time.time()
await client.submit(tasks.func, n)
cluster_duration = time.time()-start
update_results(n, local_duration, cluster_duration)
submit_button.disabled=False
component = pn.Column(
submit_button, overhead_plot, duration_plot
).servable()
python cluster.py
panel serve app.py