Only 1 worker is running when the DAG is forking

Hello,

I have three interconnected nodes:

  • Node-A hosts the scheduler.
  • Node-B hosts one Dask worker, equipped with 128 cores as resources.
  • Node-C also hosts one Dask worker, also equipped with 128 cores as resources.

The following code does not distribute the hyperparameters computation between nodes B and C. I would like to understand the reason for this. Is it due to suboptimal scheduler choices, or is there a programming error on my part?

from dask.distributed import Client
import dask
import time
import os
dask.config.set({'distributed.worker.daemon': False})

@dask.delayed
def reading():
    print(f"reading() on {os.environ.get('HOSTNAME')} ...")
    time.sleep(2)
    print(f"reading() done")
    return 0

@dask.delayed
def train(x, h):
    print(f"train(x,{h}) on {os.environ.get('HOSTNAME')} ...")
    time.sleep(2)
    print(f"train(x,{h}) done")
    return x+h

if __name__ == "__main__":
    client = Client("localhost:8786")

    with dask.annotate(resources={"cores": 1}):
        reading_op = reading()

    training_ops = []

    hyperparameters = [4, 8, 16, 32]
    for h in hyperparameters:
        with dask.annotate(resources={"cores": 100}):
            train_op = train(reading_op, h) # the compute is well distributed if I replace training_op with 0 literal
            training_ops.append(train_op)

    start_total_time = time.time()
    out = dask.compute(*training_ops)
    print(out)
    print("compute time:", time.time()-start_total_time)

    client.close()

The 2 workers are successfully running train() in parallel when I remove the need of the reading operation.

Looks like a scheduler bug.

1 Like