To brief you about my setup:
1. I have a shell script
2. Inside the shell script I am using source command to activate the virtual environment.
3. Then i am using python my_script.py(this has dask logic), inside the shell script
4. I am running the shell script with sh testing_dask.sh
Error:
When i do this, i am able to make an SSH connection but i see this error: RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments
I checked:
client.get_versions(check=True) Few paramater were not inclined: āOS-releaseā and āLC_ALLā
I tried and the code is working as expected with LocalCluster mode with 5 workers node, 1 thread each, memory_limit=ā10GBā
I tried to provide host = [ālocalhostā, ālocalhostā, ālocalhostā, ālocalhostā], but still it didnot work?
Question:
Should i have all the paramter displayed inside āclient.get_versionsā to match? Is there any paramter that i need to check apart from this?
These are the paramter displayed:
Python
Python bits
OS
OS-release
Machine
Byte Order
LC_ALL
Packages
Should client, worker and scheduler should have the exact same copy of these values shared above ?
Can you give me an example of delayed fucntion which I can run? i tried bag and delayed but dooesnot seem to work
Not necessarily, most importement is virtual environment and Python version.
What I would do here is a step by step testing, from a Python console or notebook:
Create an SSH Cluster
Open the Dashboard, and check that every worker is running correctly
Do a get_versions as youāve done
Submit a simple task, like client.submit(lambda x: x+1, 10) and check the Dashboard or get the exception.
First, you should make it work with only localhost, there is no reason to have problem within the same machine if you are pointing to the right Python environment.
Thankyou @guillaumeeb for always taking out time and responding.
As per you suggestion, i did:
Create an SSH Cluster - DONE
Open the Dashboard, and check that every worker is running correctly - Unfortuantely i am not able to access the dashbord as its a RHEL system which i connect to and work. But when i run the code, i see the workers starting
Do a get_versions as youāve done - I did and same two paramter had a mismatch āOS-releaseā and āLC_ALLā
Submit a simple task, like client.submit(lambda x: x+1, 10) and check the Dashboard or get the exception.- YES, this command was working and didnot get an exception
CODE FILES
**Run_script.sh** - shell script to activate the virtual environment and command to run python code. i run this script using **sh Run_script.sh** and which runs the python script
**Test_1.py** - This has a function *def delayed_func* to created the delayed objects
**Test_2.py** - This python file has a function `def test_2_function` which is being called from delayed function inside Test_1.py
def test_2_function(json_filename):
* Processing each json file
* Reading each row and producing to kafka topic
* Doing some processing login based on success and failed kafka records
* function not returing anything
When i execute this, I get the exception:
RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments
Question:
Is there something else i do to modify my code? or change the function to process the files?
i tried to provide the node hostname which are having a share drive connected between all nodes. Still no luck
Also, had one observation, not sure if this is right or not. Since the packages are in the virtual environemnt, and i am just providing the path to the virtual environemnt and not activating it, it is having symlink to system python path for executable.
so do i need to activate the virtual environemnt on each worker node before sending the task? How should i activate it?
Did you verify that you could gather the Future returned by the first call?
Do you have a more complete stacktrace?
I see you are using a class defined in the file, and a function imported from another file. Are these files in the virtual environment, or just in your current path? Iām wondering if the Workers environment are correctly seeing and using those. This can be tricky to correctly specify this.
I used the client.upload_file("test_2.py") and then it is not showing error and i am able to run the code.
I believe the python file test_2.py needs to be uploaded to scheduler and client using client.upload_file("test_2.py")
I have also removed all the classes and kept Test_1.py as a plain python file.
For the below new code for Test_1.py - remvoing all classes and using upload_file() method, I wanted to confirm:
Should this be dask.compute(*delayed_results) or client.compute(*delayed_results) in the below code to process the files in parallal (each file execution is independent of one another)? I am currently using dask.compute(*delayed_results) to use the nodes created using the SSH cluster
If you suggest to use client.compute(), then how should I ensure all the files are processed and then code moves to the next part? Should i do something like this? WIll this execute all future for the delayed objects to be executed in parallal?
delayed_result = [delayed(test_2_function)(file, param1, param2) for file in json_filename]
future_results = [client.compute(res) for res in delayed_result ]
result = [fs.result() for fs in future_results]
Sure @guillaumeeb, I will continue to use dask.compute() to process the data in SSH cluster in my code.
I was confused if using dask.compute() will run the code in local cluster mode, since we are not using the client in dask.compute(). But since you confirmed, i will use dask.compute() in my code.
Thank you for sending the futures link, wll also explore futures too.