Unbalanced utilisation of machines on SSHCluster (few with multiple tasks, few idling)

Hey

I am using a SSHCluster to perform a random search task. I have about 40 machines available and I initialise the cluster with about 45 tasks. Each time a task finishes, I add another one. My problem is that the tasks don’t seem to be distributed very efficiently: Currently, I have 45 tasks pending, some machines have a queue of up to 4 tasks while about 10 machines are idling. The progress section of the dashboard only lists one function name, there should not be any dependencies across workers. Is there any way to force a redistribution?

My code looks similar to this:

def init_cluster():
    computers = read_csv(path_to_some_file)['computer_name'].to_list()

    try:
        client = Client('tcp://<ip>:37153', timeout=5)
    except Exception as e:

        cluster = SSHCluster(
            hosts = computers,
            connect_options = {
                "known_hosts": None,
                "username": "[redacted]",
                },
            worker_options = {
                "n_workers": 1},
            scheduler_options = {
                "port": 37153,
                "dashboard_address": ":8797"
            }
        )

        client = Client(cluster)    
    print(colored(f'{client.dashboard_link}', 'green'))

    return client

client = init_cluster()
futures = []
for i in range(45):
    params = get_random_paramset()
    futures.append(client.submit(main_job, params, config))

seq = as_completed(futures)

for f in seq:
    handle_result(f.result(), 2nd_arg)
    f.cancel()
    del f
    if not condition:
        seq.add(client.submit(main_job, get_random_paramset(), config))

Hi @f-fritz and welcome to discourse!

Thanks for the snippet, it’s very helpful to know how you’re configuring the SSH cluster! I’m not quite sure what is going wrong here, but a couple things to note:

  • instead of calling client.submit in a for-loop, you can use client.map (see here in the docs for an example).
  • From the f.cancel() and del f lines, are you also having memory issues?

Would you also be able to share more on what kind of work main_job is doing? A screenshot of the progress panel from the dashboard would also be helpful, or perhaps another of these diagnostics?

1 Like

Hi,

Thanks for your support!

The def main_job(params, config) function is used for random search: It takes one params dict with random but individual parameters and a second config dict with fixed configuration infos. It will then

  • load a small (<< 1GB) numpy .npy file from disk (NFS share mounted on all workers) to memory depending on the given parameters in params
  • train a neural net using tensorflow on the npy data
  • evaluate it
  • append the results (few floats and strings) to the params dict and return that
    Upon completion, the handle_result() function will store the result in a csv file.

The duration of main_job() depends mostly on the number of epochs but is at least 1min. Also, main_job() is a serial function and cannot be further parallelised. The dashboard only shows the main_job function in the bottom right. So my use case of the dask SSHCluster is to run multiple instances of main_job in parallel to perform random search and not to further parallelise a task or to load very huge datasets. It looks like that’s a rather unique (especially with the while loop (if not condition) use case and I did not find too much information about it.

When I start the cluster with the first 45 Tasks, the bars in the graph form an L shape: All computers have at least one task to do, some machines at the bottom start with two tasks (as the 45 tasks is slightly larger than the number of my workers). But after a few iterations, some workers will idle and others will have e.g. four tasks queued (see screenshot). Maybe the tasks are distributed uniformly instead of depending on usage? Because some of my tasks will take considerably more time than others.

Thanks for the client.map() suggestion, I will try that!

I have not yet run into memory issues but saw on the dashboard that the results were kept in memory after using them in the for f in seq: loop. (At least I think so). So I thought it might be necessary to drop those futures manually as a preventive measure.

However, if I increase the number of tasks for initialising the cluster from 45 to e.g. 90, it will rather immediately crash with “[Errno 32] Broken pipe”. But that’s for another ticket as it’s currently easily avoidable. There might be other solutions that I didn’t come across yet, but this Error is the reason I chose the while loop, trying to continuously feed the Cluster with jobs. Maybe rewriting the script to use map() + an iterator would solve the utilisation issue. I’ll try that later…

I just tried to map a function + iterator to the cluster. I thought it might then always have exactly the number of tasks running that it is capable of and thus fully utilising the cluster.

Unfortunately, it looks like this approach is deprecated?

import dask
import time
from main import init_cluster

def sum(a,b,c):
    time.sleep(10)
    return a+b+c

client = init_cluster()

class InfIter:
    """Infinite iterator to return all
        odd numbers. Source: https://www.programiz.com/python-programming/iterator"""

    def __iter__(self):
        self.num = 1
        return self

    def __next__(self):
        num = self.num
        self.num += 2
        return num, num+1, num+2


a = InfIter()
b = iter(a)

L = client.map(sum, b)

outputs:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-79-f5efbae64cf8> in <module>
----> 1 L = client.map(sum, b)

~/miniconda3/envs/ma/lib/python3.8/site-packages/distributed/client.py in map(self, func, key, workers, retries, resources, priority, allow_other_workers, fifo_timeout, actor, actors, pure, batch_size, *iterables, **kwargs)
   1861             isinstance(i, Iterator) for i in iterables
   1862         ):
-> 1863             raise TypeError(
   1864                 "Dask no longer supports mapping over Iterators or Queues."
   1865                 "Consider using a normal for loop and Client.submit"

TypeError: Dask no longer supports mapping over Iterators or Queues.Consider using a normal for loop and Client.submit