Hello,
I have three interconnected nodes:
- Node-A hosts the scheduler.
- Node-B hosts one Dask worker, equipped with 128 cores as resources.
- Node-C also hosts one Dask worker, also equipped with 128 cores as resources.
The following code does not distribute the hyperparameters computation between nodes B and C. I would like to understand the reason for this. Is it due to suboptimal scheduler choices, or is there a programming error on my part?
from dask.distributed import Client
import dask
import time
import os
dask.config.set({'distributed.worker.daemon': False})
@dask.delayed
def reading():
print(f"reading() on {os.environ.get('HOSTNAME')} ...")
time.sleep(2)
print(f"reading() done")
return 0
@dask.delayed
def train(x, h):
print(f"train(x,{h}) on {os.environ.get('HOSTNAME')} ...")
time.sleep(2)
print(f"train(x,{h}) done")
return x+h
if __name__ == "__main__":
client = Client("localhost:8786")
with dask.annotate(resources={"cores": 1}):
reading_op = reading()
training_ops = []
hyperparameters = [4, 8, 16, 32]
for h in hyperparameters:
with dask.annotate(resources={"cores": 100}):
train_op = train(reading_op, h) # the compute is well distributed if I replace training_op with 0 literal
training_ops.append(train_op)
start_total_time = time.time()
out = dask.compute(*training_ops)
print(out)
print("compute time:", time.time()-start_total_time)
client.close()
The 2 workers are successfully running train() in parallel when I remove the need of the reading operation.