Hi,
I would like to know if there is a single coherent way to retrieve the requested number of cores when creating any type of distributed cluster. This information should be retrieved from the client
object. Examples:
cluster = LocalCluster(n_workers=4, threads_per_worker=1, processes=True)
client = Client(cluster)
client.get_ncores() # This should return 4
cluster = HTCondorCluster(cores=2, processes=2,
memory="2GB", disk="0.1GB",...)
cluster.scale(jobs=2)
client = Client(cluster)
client.get_ncores() # This should return 4 (2 cores per job)
cluster = SSHCluster(
["scheduler", "worker1", "worker2"],
worker_options={ "nprocs" : 2, "nthreads": 1, "memory_limit" : "32GB",})
client = Client(cluster)
client.get_ncores() # This should return 4 (2 cores per worker node)
And so on for any kind of cluster that can be used with the distributed client. Spark has this feature through the defaultParallelism
variable, I’m looking for the same thing here.
Currently I have a function that tries to get this information by looking at the various different ways in which the client stores information about the scheduler and the worker spec, but I think it’s very fragile (the following still doesn’t take into account SSHCluster config for example) and it only works as long as all these dictionaries are kept unchanged in the Dask implementation.
def get_ncores(client) -> int:
worker_spec: Dict[str, Any] = client.cluster.worker_spec
nworkers = len(worker_spec)
worker_opts: Dict[str, Any] = next(iter(worker_spec.values()))["options"]
# In LocalCluster, the amount of cores used per Dask worker is called
# 'nthreads'. In the dask-jobqueue cluster specifications it is called
# just 'cores'.
ncoresperworker: int = worker_opts.get("nthreads", worker_opts.get("cores"))
return nworkers * ncoresperworker
Thank you for your help,
Vincenzo