Client.submit() only running the code on one of the workers

Here’s my minimum setup.

I created a scheduler and 2 workers in terminal.

dask scheduler
dask-worker tcp://127.0.0.1:8786 --nthreads=1 --name=w1
dask-worker tcp://127.0.0.1:8786 --nthreads=1 --name=w2

And run the following code. I was expecting in both of the 2 workers terminal I should see the PID printed. And in the calling process, an array of the PID should be returned.
But the observation is, it’s doing a round robin and only print in 1 worker’s terminal each run. And the return list only have 1 value. It’s the same behavior with and without the worker names.

my dask version is 2021.12.0

import os

from distributed import Client


def f():
    print(os.getpid())
    return os.getpid()


if __name__ == "__main__":
    with Client(address='tcp://127.0.0.1:8786') as client:
        print(client.submit(f, workers=['w1', 'w2']).result())

@ubw218 Thanks for your question!

I was expecting in both of the 2 workers terminal I should see the PID printed.

You’re executing the function f() only once right now. The workers parameter only tells Dask that it can use w1 and w2 for the compute. It does not tell Dask to execute f() on both workers. You can check out the API docs here. Does this make sense?

So, you’ll need to do client.submit(f) at least twice to run it on both workers. If you do this using a for-loop, you’ll still notice that Dask runs it on just one worker, this is because Dask expects functions to be “pure” by default – this means the functions will produce the same output if given the same input, which is not the case in your example. So, you need to pass pure=False to change this:

if __name__ == "__main__":
    results = []
    with Client(address='tcp://127.0.0.1:8786') as client:
        for _ in range(2):
            results.append(client.submit(f, pure=False))
        print(client.gather(results))
        
# Output:
# [7606, 7789]

Also, note that the workers paramenter isn’t needed here because we’re using all the available workers, which is also what Dask uses by default.

Let me know if this helps!

1 Like

This is really helpful indeed! I didn’t realize this fact reading the docs. Is there a broadcast version of this function? One intended use is to gather memory usage from workers and track the maximum. (I know the dashboard does that, but I didn’t find an API to get the info). Another use is to configure logging programatically.

@ubw218 I think you might be looking for Worker Plugins?