Which client is being used?

If i Have code like this in my notebook:

from dask.distributed import Client, LocalCluster
import dask.array as da
import numpy as np
from scipy import signal

cluster = LocalCluster(n_workers=1, threads_per_worker=6) 
client = Client(cluster)

d = np.random.randn(100,20)
x = da.from_array(d, chunks=(100, 1))
A = x.map_blocks(signal.decimate, q=5, axis=0, meta=np.array((), dtype=d.dtype)).compute()

How do i know which Client object the map_blocks is using? It seems like its not possible to directly specify that I want it to use the cluster i have set up just above. Maybe it uses it automatically in the background? If so, how?
The reason for my question is that its quite important that i can specify number of available processes so that i can manage memory and be sure that i will not run out of memory.

Thanks for the question, @benja! Dask will automatically use your LocalCluster here, because according the the docs – when you initialize your Client, it registers itself as the default scheduler, and hence runs Dask Array.

If you want to specify a client explicitely, you can use client.compute(...) or client.persist(...), so in your case:

t = x.map_blocks(signal.decimate, q=5, axis=0, meta=np.array((), dtype=d.dtype)) # only generates the task graph
A = client.compute(t) # actual computation happens here

Does this help answer your question?

Also, a sidenote, the following is an equivalent way to initialize the Client with LocalCluster (reference docs):

from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=6)
1 Like

Additionally, you can call dask.compute(..., scheduler=) and there are contexts for setting the scheduler temporarily. This is for the case that you want to swap between threaded, sync (debug) and distributed schedulers.


Thank you for the answers.

1 Like