Cannot run client.run(function) when function contains get_worker() in distributed==2023.3.2.1

Hi, here is a example detailing my problem, when using distributed==2023.3.2.1:

from dask.distributed import Client, get_worker

def gworkers():
    print(get_worker())
    return 0

if __name__ == "__main__":

    n_workers = 5
    client = Client(n_workers=n_workers)
    client.run(gworkers)
    client.close()

I get the following error:

2023-04-14 15:51:27,523 - distributed.worker - WARNING - Run Failed
Function: gworkers
args:     ()
kwargs:   {}
Traceback (most recent call last):
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/worker.py", line 3288, in run
    result = function(*args, **kwargs)
  File "dump.py", line 4, in gworkers
    print(get_worker())
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/worker.py", line 2714, in get_worker
    raise ValueError("No worker found") from None
ValueError: No worker found
Traceback (most recent call last):
  File "dump.py", line 11, in <module>
    client.run(gworkers)
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/client.py", line 2965, in run
    return self.sync(
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/utils.py", line 349, in sync
    return sync(
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/utils.py", line 416, in sync
    raise exc.with_traceback(tb)
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/utils.py", line 389, in f
    result = yield future
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/client.py", line 2870, in _run
    raise exc
  File "dump.py", line 4, in gworkers
    print(get_worker())
  File "/home/danielzh/Documents/Programs/infretis/examples/basic/d_new/lib/python3.8/site-packages/distributed/worker.py", line 2714, in get_worker
    raise ValueError("No worker found") from None
ValueError: No worker found

However with distributed==2023.2.0, the script runs fine:

<Worker 'tcp://127.0.0.1:34305', name: 4, status: running, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>
<Worker 'tcp://127.0.0.1:36971', name: 3, status: running, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>
<Worker 'tcp://127.0.0.1:40035', name: 0, status: running, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>
<Worker 'tcp://127.0.0.1:43949', name: 1, status: running, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>
<Worker 'tcp://127.0.0.1:46601', name: 2, status: running, stored: 0, running: 0/3, ready: 0, comm: 0, waiting: 0>

I want all the workers to do run the same function (that contains get_worker), as similarly to this post: How to let all worker do same task in dask? - Stack Overflow

However with the new distributed update, the answer to that stackoverflow question does not seem to work anymore. Can anyone help me with this problem?

get_worker changed, so that it only returns for the case that you are running within the worker’s thread pool - so normal tasks as opposed to actors or client.run (the latter runs in the event loop’s thread).

fsspec’s dask FS backend was also hit by this. We now use:

from distributed.worker import Worker

def _in_worker():
    return bool(Worker._instances)
1 Like