Dask SSH Cluster setup

Hello everyone,

I have a question and will be happy to hear some ideas on this:

Situation:

  1. I am on a private network where nodes are accessible via SSH passwordless method
  2. I have 4 nodes - N1, N2, N3, N4
  3. I am running the python code on N1 and want to have N2, N3, N4 as worker connected via SSH
  4. In each of the nodes(N1, N2, N3, N4), i have dask installed in a python virtual environemnt.
  5. I want to run 1 worker and 1 thread on each node

My questions:
Is there a way i can modify my cluster = SSHCluster() to:

  • connect to worker node via SSH
  • start the python virtual environemnt on worker nodes, before running code on the worker nodes

I am using this code. What should i pass in connect_options if i have passwordless ssh? should it be passed for each node seperately?

Running this code on N1

from dask.distributed import Client, SSHCluster
hosts = [“N2_hostname”, “N3_hostname”, “N4_hostname”]

worker_options = {

  • “nthreads”: 1,*
  • “worker_limit” : “30GB”*
    }

connect_options = {

}

remote_python = “//virtualenv/bin/python”

cluster = SSHCluster(

  • hosts=hosts,*
  • worker_options=worker_options,*
  • connect_options=connect_options,*
  • remote_python=remote_python*
    )

client = Client(cluster)

Hi @NewDaskUser, welcome to Dask community!

Your setup looks good, SSHCluster should be alright here. If you have already enabled passwordless SSH, you shouldn’t need any connect_options.

Are you encountering issues?

Also keep in mind that the first hostname given to SSHCluster will only launch the Scheduler.

thankyou @guillaumeeb.

i did the setup and provided these variables:

host = [“localhost”, “localhost”, “N2_node_hostname”, “N3_node_hostname”]
remote_python = “path_to_python_virtual_environment/bin/python”
scheduler_options = {“port”:0, “dashboard_Address”:8797}

To brief you about my setup:
1. I have a shell script
2. Inside the shell script I am using source command to activate the virtual environment.
3. Then i am using python my_script.py(this has dask logic), inside the shell script
4. I am running the shell script with sh testing_dask.sh

Error:
When i do this, i am able to make an SSH connection but i see this error:
RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments

I checked:

  1. client.get_versions(check=True)
    Few paramater were not inclined: ‘OS-release’ and ‘LC_ALL’

  2. I tried and the code is working as expected with LocalCluster mode with 5 workers node, 1 thread each, memory_limit=‘10GB’

  3. I tried to provide host = [“localhost”, “localhost”, “localhost”, “localhost”], but still it didnot work?

Question:

  1. Should i have all the paramter displayed inside “client.get_versions” to match? Is there any paramter that i need to check apart from this?
    These are the paramter displayed:
  • Python
  • Python bits
  • OS
  • OS-release
  • Machine
  • Byte Order
  • LC_ALL
  • Packages
  1. Should client, worker and scheduler should have the exact same copy of these values shared above ?
  2. Can you give me an example of delayed fucntion which I can run? i tried bag and delayed but dooesnot seem to work

Not necessarily, most importement is virtual environment and Python version.

What I would do here is a step by step testing, from a Python console or notebook:

  1. Create an SSH Cluster
  2. Open the Dashboard, and check that every worker is running correctly
  3. Do a get_versions as you’ve done
  4. Submit a simple task, like client.submit(lambda x: x+1, 10) and check the Dashboard or get the exception.

First, you should make it work with only localhost, there is no reason to have problem within the same machine if you are pointing to the right Python environment.

Thankyou @guillaumeeb for always taking out time and responding.
As per you suggestion, i did:

  1. Create an SSH Cluster - DONE

  2. Open the Dashboard, and check that every worker is running correctly - Unfortuantely i am not able to access the dashbord as its a RHEL system which i connect to and work. But when i run the code, i see the workers starting

  3. Do a get_versions as you’ve done - I did and same two paramter had a mismatch “OS-release” and “LC_ALL”

  4. Submit a simple task, like client.submit(lambda x: x+1, 10) and check the Dashboard or get the exception.- YES, this command was working and didnot get an exception



CODE FILES

**Run_script.sh** - shell script to activate the virtual environment and command to run python code. i run this script using **sh Run_script.sh** and which runs the python script

**Test_1.py** - This has a function *def delayed_func* to created the delayed objects 

**Test_2.py** - This python file has a function `def test_2_function` which is being called from delayed function inside Test_1.py

CODE STRUCTURE

Run_script.sh

    source <<activate virtual environment>>
    python Test_1.py

Test_2.py

def test_2_function(json_filename):
      * 	Processing each json file
      * 	Reading each row and producing to kafka topic
      * 	Doing some processing login based on success and failed kafka records
      * 	function not returing anything

Test_1.py

    from test_2 import test_2_function
    Class DataTransfer:
        def delayed_func(self):
            json_filename = ["file1.json","file_2.json", "file_3.json"]
            delayed_result = [delayed(test_2_function)(file) for file in json_filename]
            dask.compute(*delayed_results)
                
    if __name__ == '__main__':
        try:
            # CLIENT MODE
            # cluster = LocalCluster(dashboard_address=':8787', n_workers=3, threads_per_worker=1, memory_limit='16GB')
            # client = Client(cluster)
            
            # CLUSTER MODE
            cluster = SSHCluster(
                ["localhost", "localhost", "localhost", "localhost"],
                connect_options={"known_hosts": None, "username":<<username>>},
                worker_options={"nthreads": 2},
                scheduler_options={"port": 0, "dashboard_address": ":8797"}
                remote_python = ["virtual_env_path/bin/python","virtual_env_path/bin/python","virtual_env_path/bin/python","virtual_env_path/bin/python"]
            )
            client = Client(cluster)
            data_transfer = DataTransfer()
            data_transfer.delayed_func()
        except Exception as e:
                 print(f"Failed - Exception {e}")
        finally:
            if client:
                client.close()
            if cluster:
                cluster.close()

When i execute this, I get the exception:
RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments

Question:

  1. Is there something else i do to modify my code? or change the function to process the files?
  2. i tried to provide the node hostname which are having a share drive connected between all nodes. Still no luck

Also, had one observation, not sure if this is right or not. Since the packages are in the virtual environemnt, and i am just providing the path to the virtual environemnt and not activating it, it is having symlink to system python path for executable.

so do i need to activate the virtual environemnt on each worker node before sending the task? How should i activate it?

Did you verify that you could gather the Future returned by the first call?

Do you have a more complete stacktrace?

I see you are using a class defined in the file, and a function imported from another file. Are these files in the virtual environment, or just in your current path? I’m wondering if the Workers environment are correctly seeing and using those. This can be tricky to correctly specify this.

@guillaumeeb thankyou for your inputs.

  • I used the client.upload_file("test_2.py") and then it is not showing error and i am able to run the code.
  • I believe the python file test_2.py needs to be uploaded to scheduler and client using client.upload_file("test_2.py")
  • I have also removed all the classes and kept Test_1.py as a plain python file.

For the below new code for Test_1.py - remvoing all classes and using upload_file() method, I wanted to confirm:

  • Should this be dask.compute(*delayed_results) or client.compute(*delayed_results) in the below code to process the files in parallal (each file execution is independent of one another)? I am currently using dask.compute(*delayed_results) to use the nodes created using the SSH cluster

  • If you suggest to use client.compute(), then how should I ensure all the files are processed and then code moves to the next part? Should i do something like this? WIll this execute all future for the delayed objects to be executed in parallal?

delayed_result = [delayed(test_2_function)(file, param1, param2) for file in json_filename] 
future_results = [client.compute(res) for res in delayed_result ]
result = [fs.result() for fs in future_results]

Test_1.py

from test_2 import test_2_function

def delayed_func():
    json_filename = ["file1.json","file_2.json", "file_3.json", "file_4.json", "file_5.json", "file_6.json"]
    param1 = <<value>>
    param2 = <<value>>

	delayed_result = [delayed(test_2_function)(file, param1, param2) for file in json_filename] # this calls function *test_2_function* inside `test_2.py`
	dask.compute(*delayed_results)
		
if __name__ == '__main__':
	try:
		# CLIENT MODE
		# cluster = LocalCluster(dashboard_address=':8787', n_workers=3, threads_per_worker=1, memory_limit='16GB')
		# client = Client(cluster)
		
		# CLUSTER MODE
		cluster = SSHCluster(
			["localhost", "localhost", "node_2_hostname", "node_3_hostname", "node_4_hostname"],
			connect_options={"known_hosts": None, "username":<<username>>},
			worker_options={"nthreads": 2},
			scheduler_options={"port": 0, "dashboard_address": ":8797"}
			remote_python = ["virtual_env_path/bin/python"]*5
		)
		client = Client(cluster)
		client.upload_file("test_2.py")
		delayed_func()
	except Exception as e:
			 print(f"Failed - Exception {e}")
	finally:
		if client:
			client.close()
		if cluster:
			cluster.close()

These are equivalent.

There are ways also to wait for futures if you need to: Futures — Dask documentation.

Sure @guillaumeeb, I will continue to use dask.compute() to process the data in SSH cluster in my code.

I was confused if using dask.compute() will run the code in local cluster mode, since we are not using the client in dask.compute(). But since you confirmed, i will use dask.compute() in my code.

Thank you for sending the futures link, wll also explore futures too.

1 Like

thankyou @guillaumeeb Since it was a long thread, marked your last answer as solution!

1 Like