How to get each worker to process only one task when using SGECluster

Hi, I am using SGECluster to fork dask workers. All tasks squeezed into one node (50 tasks and 50 workers/nodes):

cluster = SGECluster(
        nanny = False,
        processes = ph, # number of workers per job
        cores = ph, # make sure nthreads == 1, each dask worker forks one thread
        scheduler_options=dict(dashboard_address=':0'),
        job_extra_directives = ['-pe {} {}'.format(pe, cores_per_node),
                     '-cwd',
                     '-S /bin/csh',
                     '-j y',
                     '-o {}'.format(odaskdir + '/')],
        job_script_prologue=[
                             'setenv DEVITO_LOGGING DEBUG',
                             'setenv DEVITO_LANGUAGE openmp', 
                             f'setenv OMP_NUM_THREADS {nthreads}'],
        memory = memory_per_node,
        project = project_name,
        walltime = runtime_limit
    )

I already set “cores=1”. The weird thing is if I use only 10 nodes, all workers get some tasks to process.

Is there an easy way to force one worker to handle only one task? Thanks.

I also tried manually adding abstract resource into SGECluster initiation: ‘setenv DASK_DISTRIBUTED__WORKER__RESOURCES__FOO 1’ to job_script_prologue, but it doesn’t work. None of the tasks will start. Not sure if I add it to the right place.

Hi @llodds,

Can you reproduce this behavior with LocalCluster?

You shouldn’t have to do that.

What is your workflow after starting the cluster? How do you scale Workers?

@guillaumeeb Thank you very much for looking into this –

To scale the workers, I use cluster.scale(nnodes*ph), nnodes is the number of nodes/jobs, and ph is the number of workers per node. The processing workflow is each worker will do some wave simulation calling the Devito package. 50 tasks to handle wave simulations originated from 50 different source locations.

I also did the following lines to ensure the correct number of workers are allocated before going to the processing workflow:

while client.status == "running" and len(client.scheduler_info()['workers'])/ph < nnodes:
        time.sleep(1.0)
    client.wait_for_workers(ph*nnodes)

Unfortunately, when using LocalCluster, I am seeing the same issue:


This Monday I had figured out a workaround – get all the worker addresses, and then submit task to a specific worker manually:

worker_address_list = [ws.address for ws in client.cluster.scheduler.workers.values()]
L = len(worker_address_list)
for i in range(0, nshots):
     futures.append( client.submit(..., workers = [worker_address_list[i%L]] ))

So far it works fine, but not great. If we want to use adapt to save some node resources if some tasks are able to finish early, then there is no way for dask to automatically schedule tasks to achieve this goal (dask will put all 50 tasks into a single worker and decide not to allocate more workers at the very beginning). So really needs help to figure out why dask auto-scheduling doesn’t work here. Thanks.

In our case, a task takes around 1.5 hours, if we are using 300 nodes (max resource available) to handle 901 tasks, 299 nodes will wait idle for 1.5 hours during the final batch.

This is not really unfortunate, it will be much easier to investigate! Could you provide a complete reproducer, this might be some issue on Dask side?

Which Dask version are you using?

Yes, this is not acceptable, and this is not the normal behavior of Dask.

@guillaumeeb I have removed all the business content leaving the minimal stuff for reproducibility. Please check and see if you can reproduce the issue:

I did 4 tests:

  1. use LocalCluster() without specifying threads_per_worker=: all 50 tasks go to one worker in the first round (bad!)
  2. use LocalCluster() and specify threads_per_worker=1: all workers get at least one task to handle (good!)
  3. use SGECluster() and scale to 10 workers (=10 nodes, one worker/node): all workers get at least one task to handle (good!)
  4. use SGECluster() and scale to 25 workers (=25 nodes, one worker/node): all 50 tasks go to one worker in the first round (bad!)

We plan to scale to 400+ nodes in our cluster. Thanks.

@llodds, in the github link I only see an html page, could you just put a .py file or .ipynb file in there?

It’s weird that you seem to get a bit random tasks distribution…

@guillaumeeb Thanks for looking –
I just added the .ipynb file.

I just try to execute the code on my laptop with a LocalCluster, but it crashed my notebook (and freezed my computer during 5 minutes).

I didn’t look at the content first. Looking at it after that, I notice that:

nxmodel = 1271
nymodel = 1299
nzmodel = 675
dm_in = np.zeros([nxmodel,nymodel,nzmodel])
image_up_dev = np.zeros([nxmodel,nymodel,nzmodel]) 

You are building two 8GiB array on client side, this is huge!

dm_in_future = client.scatter(dm_in)

You are scattering one of it on Workers, it’s quite a big data transfer. You should try to use another way to load your model on your workers.

Since there is now the Active Memory Manager active by default, I wonder if it can be the cause of your problem of tasks distribution, since the AMM will try to keep the scattered model replicas as low as possible.

cc @crusaderky.

It might also depend on you Dask Cluster configuration: how big are your workers (Memory, nb threads ?)?

Also, what Dask version are you using?

@guillaumeeb Sorry for the late response –

Looking at the .yml file, the dask version is 22.7.0. For the test job, each worker uses ~40GB memory (this is actually small memory footprint… Typically we have to handle 300GB+ memory usage per job) and 24 threads (there are some issues with Devito, we should expect each worker using 112 threads on 112 cores).

What do you suggest to scatter big-memory input to each worker? 8GB is not big for our workload. One alternative I can think of is to write the input to filesystem and then have all the workers read from there instead.

Thanks again for looking into this.