Which client is being used?

If i Have code like this in my notebook:

from dask.distributed import Client, LocalCluster
import dask.array as da
import numpy as np
from scipy import signal

cluster = LocalCluster(n_workers=1, threads_per_worker=6) 
client = Client(cluster)

d = np.random.randn(100,20)
x = da.from_array(d, chunks=(100, 1))
A = x.map_blocks(signal.decimate, q=5, axis=0, meta=np.array((), dtype=d.dtype)).compute()

How do i know which Client object the map_blocks is using? It seems like its not possible to directly specify that I want it to use the cluster i have set up just above. Maybe it uses it automatically in the background? If so, how?
The reason for my question is that its quite important that i can specify number of available processes so that i can manage memory and be sure that i will not run out of memory.

Thanks for the question, @benja! Dask will automatically use your LocalCluster here, because according the the docs – when you initialize your Client, it registers itself as the default scheduler, and hence runs Dask Array.

If you want to specify a client explicitely, you can use client.compute(...) or client.persist(...), so in your case:

t = x.map_blocks(signal.decimate, q=5, axis=0, meta=np.array((), dtype=d.dtype)) # only generates the task graph
A = client.compute(t) # actual computation happens here

Does this help answer your question?

Also, a sidenote, the following is an equivalent way to initialize the Client with LocalCluster (reference docs):

from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=6)
1 Like

Additionally, you can call dask.compute(..., scheduler=) and there are contexts for setting the scheduler temporarily. This is for the case that you want to swap between threaded, sync (debug) and distributed schedulers.

3 Likes

Thank you for the answers.

1 Like