Accessing dashboard of scheduler started programmatically

Consider the below setup:

cluster = SSHCluster(["localhost", "localhost"], 
                        connect_options={"known_hosts": None},
                        worker_options={"n_workers": 6], },
                        scheduler_options={"port": 0, "dashboard_address": ":8797"},)

client = Client(cluster)

starting a scheduler and 6 workers on localhost.
How can I access the dashboard?

First of all, I am specifying port 8797, when the program is running I try to access localhost:8797 and get the below:

Dask needs bokeh!=3.0.*,>=2.4.2 for the dashboard.
Install with conda: conda install bokeh!=3.0.*,>=2.4.2
Install with pip: pip install bokeh!=3.0.*,>=2.4.2

but bokeh is installed, version 2.4.2 to be specific. I am using a virtual environment, and I made sure bokeh is installed both in the virtual environment and in the main Python environment on the Linux VM.

Furthermore in the output I see:

distributed.worker - INFO - dashboard at: 192.168.0.15:44891

Accessing this link gives: 404 not found

I made sure port 44891 is open on the VM. but anyway this port is different on every run, and why isn’t it 8797? Why is port 8797 “accessible”, but giving the “bokeh” error?

I looked at this link and tried the below command from a terminal opened in the virtual environment:

ssh -L 8000:localhost:44891 jurgen@localhost
dask-scheduler

Accessing localhost:8000 works but this seems to be a completely new scheduler, and not the one started programmatically. Any help with this? Conceptually I am also a bit lost: are we even able to connect with the dashboard of a scheduler started programmatically? What is the easiest way to do it? Thanks in advance.

This is weird, this still looks like an environment problem to me. Could you try to call client.get_versions(check=True) and report the result?

This is the Worker dashboard, not the main distributed Dashboard. Each worker also have a dashboard.

See below.

{'scheduler': {'host': {'python': '3.11.2.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '6.2.0-32-generic', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'None', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.11.2.final.0', 'dask': '2023.8.1', 'distributed': '2023.8.1', 'msgpack': '1.0.5', 'cloudpickle': '2.2.1', 'tornado': '6.3.3', 'toolz': '0.12.0', 'numpy': '1.24.2', 'pandas': '2.1.0', 'lz4': None}}, 'workers': {'tcp://192.168.0.19:34079': {'host': {'python': '3.11.2.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '6.2.0-32-generic', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'None', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.11.2.final.0', 'dask': '2023.8.1', 'distributed': '2023.8.1', 'msgpack': '1.0.5', 'cloudpickle': '2.2.1', 'tornado': '6.3.3', 'toolz': '0.12.0', 'numpy': '1.24.2', 'pandas': '2.1.0', 'lz4': None}}, 'tcp://192.168.0.19:42695': {'host': {'python': '3.11.2.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '6.2.0-32-generic', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'None', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.11.2.final.0', 'dask': '2023.8.1', 'distributed': '2023.8.1', 'msgpack': '1.0.5', 'cloudpickle': '2.2.1', 'tornado': '6.3.3', 'toolz': '0.12.0', 'numpy': '1.24.2', 'pandas': '2.1.0', 'lz4': None}}, 'tcp://192.168.0.19:42955': {'host': {'python': '3.11.2.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '6.2.0-32-generic', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'None', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.11.2.final.0', 'dask': '2023.8.1', 'distributed': '2023.8.1', 'msgpack': '1.0.5', 'cloudpickle': '2.2.1', 'tornado': '6.3.3', 'toolz': '0.12.0', 'numpy': '1.24.2', 'pandas': '2.1.0', 'lz4': None}}, 'tcp://192.168.0.19:45899': {'host': {'python': '3.11.2.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '6.2.0-32-generic', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'None', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.11.2.final.0', 'dask': '2023.8.1', 'distributed': '2023.8.1', 'msgpack': '1.0.5', 'cloudpickle': '2.2.1', 'tornado': '6.3.3', 'toolz': '0.12.0', 'numpy': '1.24.2', 'pandas': '2.1.0', 'lz4': None}}}, 'client': {'host': {'python': '3.11.2.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '6.2.0-32-generic', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'None', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.11.2.final.0', 'dask': '2023.8.1', 'distributed': '2023.8.1', 'msgpack': '1.0.5', 'cloudpickle': '2.2.1', 'tornado': '6.3.3', 'toolz': '0.12.0', 'numpy': '1.24.2', 'pandas': '2.1.0', 'lz4': None}}}

Any idea?

hi @guillaumeeb today I can seem to access the dashboard. perhaps a restart of the VM fixed it.

Based on the new information that I now have access to, I have some questions, please:

  1. When using dask.compute() on dask.delayed, say on 4 workers, are we using Tasks? what should I be seeing in the Tasks tab? solved further down
  2. The Graph tab is empty. In the output I see: User Warning: Sending large graph of size 253.41 MiB. This may cause some slowdown. Considering scattering data ahead of time and using futures. Should I be seeing something in the Graph tab? solved further down
  3. The workers tab shows the 4 workers. I am using a VM. The host machine has 32gb of RAM. The VM has been allocated 20gb of RAM. The workers seem to have a limit of 10.2gb each. When combined this amounts to 40.7gb. How is this possible?
  4. What am I supposed to be seeing in the Status tab? Everything is empty solved further down
  5. Is there any information that can lead me to understand more on the communication overhead, i.e. overhead related to assigning work to the multiple workers? Such as when passing large data structures as parameters to the workers.
  6. Why am I seeing a cap of max 6% CPU on each worker? This is so low. No wonder why it is taking so long to finish the work. solved further down
  7. Why is Dask using so much memory? Prior to discovering Dask I was experimenting with multiprocessing. I could easily use 8 processes and not run out of memory. With Dask, I am borderlining with 4 workers. (I am considering Dask to distribute across multiple nodes in a cluster, but for the time being I am testing on a single VM)
  8. Update 1 Should the dashboard still be accessible, and the workers still running, after calling: client.shutdown() ? How do we properly close a cluster, with all its workers? solved further down
  9. Update 1 The IP addresses of the workers in the dashboard, don’t match the ones I am seeing in the console. This likely explains the above (the workers being shown in that dashboard are not actually doing anything). Why can’t I see the newly created workers, for the current run, in the dashboard? Why do the workers change on every run? And why are they not assigned to the same “dashboard” if the same port is used on subsequent runs?
  10. Update 2 I restarted the VM and for the first run, the IPs I am seeing in the console, match those in the Workers tab of the Dask dashboard. The CPU is running at over 100%. Again how is this possible (> 100%)? but at least I think it makes more sense, and explains 6 above.
  11. Update 2 I am looking at the Processes in System Monitor, and there are 6 Python3.11 processes. One of which is using around 7.2gb, the other is using 5.8gb, and the other 4 are using around 2.2gb each. Is the first one the client, and the second one the scheduler? The last 4 are obviously the workers.
  12. Update 2 After having restarted, and ensuring that client.shutdown() is being called at the end of the script, the dashboard is no longer accessible. Upon running again, I get access to the dashboard again, and I get workers that match what I am seeing in the console. But in some cases the script may hang and not get as far as the client.shutdown(), or in my case I didn’t have the client.shutdown() in the first place. On subsequent runs a new scheduler is created, yet the dashboard keeps showing the workers created in the very first run. Is this intended functionality? Can we somehow close the initial dashboard with its scheduler and workers, to ensure that we always have access to the resources of the latest run? This explains most of the above

I am reading the documentation, but some of my questions are much more specific. Your reply would be most appreciated.

Okay, that’s a lot of questions, I’ll try to answer them as much as I can.

No matte the API/Collection you are using, it will build a tasks graph, and each task will be run on Workers. So you should see those tasks on the corresponding tab.

For 2, 4, 6, 9: it seems you are starting new Dask clusters without properly shutting down old ones. This will make the new Scheduler and Dashboard, but also workers, use other available ports, even if you try to specify one (because this one is already used). So you are not looking at the correct Dashboard.

What are the parameters you are giving to dask-ssh? Dask should be able to calculate automatically the amount of memory for each worker based on the amount of memory available on the host.

https://distributed.dask.org/en/stable/efficiency.html

We would need your workflow to better understand the problem. Dask will usually need at maximum number of worker threads x partition size x 2 as memory footprint for a embarrassingly parallel workload.

No, this is the correct way.

As explained above, if you start a fresh Dask cluster each time and don’t properly close the previous ones, you’ll use random ports.

Are you using threads?

Well, it’s hard to tell…

Again, the new Scheduler and Workers will use another port, you’re still looking at the old ones.

Wow, thanks for taking the time to reply to me in such detail, and apologies for my lengthy comment and long list of questions. You have answered most that I needed to know and I will try to keep it shorter next time.

The parameters I am giving to Dask may be seen in the code snippet in the original question. I am specifying n workers, and not specifying the number of threads. I assume this means that Dask would try to utilise all the available threads, across all the available cores. And I suspect that by utilising all the threads, this may utilise more memory. When using a pool with the multiprocessing library, it probably runs on a single core, and does not utilise multithreading. if this is the case, it would be quite a divergence from how Dask operates and might explain the difference in memory footprint.

To utilise a single thread, I guess we can just specify “nthreads” as 1. And can we assign affinity to a worker node, i.e. force a worker to run on a specific core?

Finally, a small note about my workflow. I have read more about Dask and started perhaps understanding more about what you mean by “embarrassingly parallel”. Most examples seem to deal with applying distributed computations on massive data structures. I have large data structures, but my worker methods are not simply applying computations over these data structure. Instead they feature complex logic. The worker methods accept large data structures as parameters and return large data structures as output. The output data structures are generated in the context of each worker method, returned to the local process, combined, and passed as parameters to the next worker process (excuse my attempt for over simplification, trying to keep it short and failing somehow :slight_smile: ). This means there are a lot of dynamic parameters that may not simply be initialized as static data on the worker node’s side. Initially I was splitting this data as much as possible, and ensuring I send only the data that is required to each worker, and in Dask terms this sounds like a partitioning problem. However, would using a Dask Data structure with partitions, make a positive difference, when compared to a simple split Python array, given the kind of worker methods (type of computation) that I have?

Essentially with multiprocessing I got some great results. But I wanted to look past the constraints of a single node, and consider distributing the work further over a cluster. I am still not sure if Dask is the right tool for me, and this is what I am trying to figure out. Your help is hugely appreciated.

Tried to set nthreads to 1, the situation is actually worse. It always crashes with the KilledWorker exception, very likely because it keeps using up memory until it exceeds the max memory on the VM (it always seem close to the max memory when it crashes). I tried with 4 workers and 2 workers, and it doesn’t make a difference.

However, the error is:

distributed.scheduler.KillerWorker: Attempted to run task worker_cccce54c-d187-46dc-91cd-9f7af716097c on 3 different workers, but all those workers died while running it.

Which doesn’t suggest memory to be the issue.

What did you mean by “are you using threads?”. Does the “nthreads = 1” idea make sense?

The error log proceeds to suggest checking the logs for the workers. Where do I get these from? I did not find anything in the dask-scratch-space folder.

I am also sometimes getting “Killed” exceptions when running with no nthreads parameter, and 4 workers, which was previously always working. For e.g. on one specific run, I got a “Killed” message in the console, and this seems to have killed the Scheduler. However, the workers were not killed, and they processed their workload successfully (without running out of memory). How can I understand why the scheduler is being killed in such a case?

I am lost :frowning:

Yes Dask should behave like this. Could you launch Dask-ssh with the 6 workers configuration and report which configuration of worker you have? How many processes/memory/threads per worker does Dask choose? You should be able to see this as a Widget, or in Dashboard info tab.

Well, it should not, but then it depends on your workflow, chunking scheme, etc.

No I don’t think you can force affinity, but I’m not sure you really need to?

This might explain your memory problems. It’s a bit to abstract for us to help here. If you could just provide a minimal example this would really help identifying potential memory problems.

It might yes.

Do you have the complete log? It can also be Dask killing the worker because it tries to use more memory that the limit Dask told it to use.

Using threads, or also depending upon the library you are using, you can perfectly have more that 100% cpu usage.

You can try client.get_worker_logs().

Your best hope are the Scheduler logs.

In the end, it’s really hard to help without a minimal reproducer.

Also, since currently you are not distributing on several machines, I would highly recommend to start testing Dask using a LocalCluster setup, much easier to investigate.