How to retrieve the requested number of cores?

Hi,
I would like to know if there is a single coherent way to retrieve the requested number of cores when creating any type of distributed cluster. This information should be retrieved from the client object. Examples:

cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)
client.get_ncores() # This should return 4
cluster = HTCondorCluster(cores=2, processes=2,
                          memory="2GB", disk="0.1GB",...)
cluster.scale(jobs=2)
client = Client(cluster)
client.get_ncores() # This should return 4 (2 cores per job)
cluster = SSHCluster(
    ["scheduler", "worker1", "worker2"],
    worker_options={ "nprocs" : 2, "nthreads": 1, "memory_limit" : "32GB",})
client = Client(cluster)
client.get_ncores() # This should return 4 (2 cores per worker node)

And so on for any kind of cluster that can be used with the distributed client. Spark has this feature through the defaultParallelism variable, I’m looking for the same thing here.

Currently I have a function that tries to get this information by looking at the various different ways in which the client stores information about the scheduler and the worker spec, but I think it’s very fragile (the following still doesn’t take into account SSHCluster config for example) and it only works as long as all these dictionaries are kept unchanged in the Dask implementation.

def get_ncores(client) -> int:
        worker_spec: Dict[str, Any] = client.cluster.worker_spec
        nworkers = len(worker_spec)
        worker_opts: Dict[str, Any] = next(iter(worker_spec.values()))["options"]
        # In LocalCluster, the amount of cores used per Dask worker is called
        # 'nthreads'. In the dask-jobqueue cluster specifications it is called
        # just 'cores'.
        ncoresperworker: int = worker_opts.get("nthreads", worker_opts.get("cores"))

        return nworkers * ncoresperworker

Thank you for your help,
Vincenzo

@vpadulan Welcome! Would sum(client.ncores().values()) not work for all cases?

Hi @pavithraes ,
Thanks a lot for your answer! I tried ncores in the past, then forgot about it when writing the post above. Unfortunately, it doesn’t work in all cases.
Specifically, I am most interested in usage of batch resource managers like HTCondor, Slurm through dask-jobqueue. In the example that I posted above, this is what I get with ncores()

cluster = HTCondorCluster(cores=2, processes=2,
                          memory="2GB", disk="0.1GB",...)
cluster.scale(jobs=2)
client = Client(cluster)
[...]
>>> client.ncores()
{}

And the issue is that the HTCondorCluster object still doesn’t have information from the workers because probably the resource manager hasn’t started them. I need this information from the spec that I passed to the object when creating it.

So probably as a workaround I can use ncores() whenever that doesn’t return an empty dict, and then fallback to the custom logic I wrote in my previous post. Although it would be nice if this function worked also with the cluster objects from dask-jobqueue.

@vpadulan You are right that the number of cores aren’t presented consistently, I’d encourage you to open a feature request on dask-jobqueue. :smile:

You can also find some relevant information in the client.scheduler_info() dict, and do something like:

workers = sum(w['nthreads'] for w in client.scheduler_info()['workers'].values())

Does sum(client.nthreads().values()) get you the correct answer?

1 Like

Hi @phobson ,
Unfortunately I get the same result as above

>>> client.ncores()
{}
>>> client.nthreads()
{}

I guess it’s really a matter of missing feature, the ncores/nthreads interfaces collect what the scheduler knows at that point in the application. With the batch systems, since the jobs haven’t spawned, the scheduler has no clue about the worker resources. I just hoped this was generalized already to all types of clusters supported by Dask. I will open a feature request on dask-jobqueue as suggested.

1 Like