I already set âcores=1â. The weird thing is if I use only 10 nodes, all workers get some tasks to process.
Is there an easy way to force one worker to handle only one task? Thanks.
I also tried manually adding abstract resource into SGECluster initiation: âsetenv DASK_DISTRIBUTED__WORKER__RESOURCES__FOO 1â to job_script_prologue, but it doesnât work. None of the tasks will start. Not sure if I add it to the right place.
@guillaumeeb Thank you very much for looking into this â
To scale the workers, I use cluster.scale(nnodes*ph), nnodes is the number of nodes/jobs, and ph is the number of workers per node. The processing workflow is each worker will do some wave simulation calling the Devito package. 50 tasks to handle wave simulations originated from 50 different source locations.
I also did the following lines to ensure the correct number of workers are allocated before going to the processing workflow:
while client.status == "running" and len(client.scheduler_info()['workers'])/ph < nnodes:
time.sleep(1.0)
client.wait_for_workers(ph*nnodes)
Unfortunately, when using LocalCluster, I am seeing the same issue:
This Monday I had figured out a workaround â get all the worker addresses, and then submit task to a specific worker manually:
worker_address_list = [ws.address for ws in client.cluster.scheduler.workers.values()]
L = len(worker_address_list)
for i in range(0, nshots):
futures.append( client.submit(..., workers = [worker_address_list[i%L]] ))
So far it works fine, but not great. If we want to use adapt to save some node resources if some tasks are able to finish early, then there is no way for dask to automatically schedule tasks to achieve this goal (dask will put all 50 tasks into a single worker and decide not to allocate more workers at the very beginning). So really needs help to figure out why dask auto-scheduling doesnât work here. Thanks.
In our case, a task takes around 1.5 hours, if we are using 300 nodes (max resource available) to handle 901 tasks, 299 nodes will wait idle for 1.5 hours during the final batch.
@guillaumeeb I have removed all the business content leaving the minimal stuff for reproducibility. Please check and see if you can reproduce the issue:
I did 4 tests:
use LocalCluster() without specifying threads_per_worker=: all 50 tasks go to one worker in the first round (bad!)
use LocalCluster() and specify threads_per_worker=1: all workers get at least one task to handle (good!)
use SGECluster() and scale to 10 workers (=10 nodes, one worker/node): all workers get at least one task to handle (good!)
use SGECluster() and scale to 25 workers (=25 nodes, one worker/node): all 50 tasks go to one worker in the first round (bad!)
We plan to scale to 400+ nodes in our cluster. Thanks.
You are building two 8GiB array on client side, this is huge!
dm_in_future = client.scatter(dm_in)
You are scattering one of it on Workers, itâs quite a big data transfer. You should try to use another way to load your model on your workers.
Since there is now the Active Memory Manager active by default, I wonder if it can be the cause of your problem of tasks distribution, since the AMM will try to keep the scattered model replicas as low as possible.
Looking at the .yml file, the dask version is 22.7.0. For the test job, each worker uses ~40GB memory (this is actually small memory footprint⌠Typically we have to handle 300GB+ memory usage per job) and 24 threads (there are some issues with Devito, we should expect each worker using 112 threads on 112 cores).
What do you suggest to scatter big-memory input to each worker? 8GB is not big for our workload. One alternative I can think of is to write the input to filesystem and then have all the workers read from there instead.
Finally we decide to read from a central file in filesystem rather than scattering big input to multiple workers, now adapt works, we donât have to manually specify node address to each worker, and each worker will get at least one task from the very beginning.