Getting the following stream closed error from the client on reading huge dataframes. It is working perfectly on a databricks environment with dask framework whereas we are started getting issue on porting the same to aks cluster. But it works for smaller dataframes in aks too.
Here is the sample code…
Any help is highly appreciated.
for idx,x in tcm_dataframe[['aaa', 'bbb']].iterrows():
p=dask.delayed({x['aaa']: x['bbb']} )
d.append(p)
dic = dask.compute(*d)
File "/usr/local/lib/python3.10/site-packages/distributed/core.py", line 1154, in send_recv
response = await comm.read(deserializers=deserializers)
File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 236, in read
convert_stream_closed_error(self, e)
File "/usr/local/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.gather local=tcp://.....:... remote=tcp://......:...>: Stream is closed
It’s a bit hard to help from here. Looks like some connection problem. Maybe your Scheduler node is saturated, or worker nodes. What type of hardware (VMs size and network) do you have in your AKS cluster? Is it really different from the Databricks one?
Here are the informations i have as of now. This is really dirving me crazy. There are no valid infos from shceduler or worker nodes. Even i have increased the timeouts as follows suspecting timeout issue. I am able to run the same code in Databricks along with Dask.
Started facing issues on porting the same code (with minor modifications) to kube cluster whereas it is working fine in Kube for smaller payloads. This is one of our extreme usecase where we are having a parquet file of 2 Gb.
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.gather local=tcp://3:36636 remote=tcp://18786>: Stream is closed
From Worker node:
C:\aks\flask-api - Copy (3)\dask>kubectl logs dask-worker-767746cd5-5vg6t
/usr/local/lib/python3.10/site-packages/distributed/cli/dask_worker.py:266: FutureWarning: dask-worker is deprecated and will be removed in a future release; use dask worker instead
warnings.warn(
2024-08-05 17:29:27,036 - distributed.nanny - INFO - Start Nanny at: ‘tcp://:34199’
2024-08-05 17:29:27,698 - distributed.worker - INFO - Start worker at: tcp://46625
2024-08-05 17:29:27,698 - distributed.worker - INFO - Listening to: tcp://46625
2024-08-05 17:29:27,698 - distributed.worker - INFO - dashboard at: :35835
2024-08-05 17:29:27,698 - distributed.worker - INFO - Waiting to connect to: tcp://dask-scheduler:8786
2024-08-05 17:29:27,698 - distributed.worker - INFO - -------------------------------------------------
2024-08-05 17:29:27,698 - distributed.worker - INFO - Threads: 8
2024-08-05 17:29:27,698 - distributed.worker - INFO - Memory: 31.34 GiB
2024-08-05 17:29:27,698 - distributed.worker - INFO - Local Directory: /tmp/dask-scratch-space/worker-ojk2aznp
2024-08-05 17:29:27,698 - distributed.worker - INFO - -------------------------------------------------
2024-08-05 17:29:27,957 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-08-05 17:29:27,958 - distributed.worker - INFO - Registered to: tcp://dask-scheduler:8786
2024-08-05 17:29:27,958 - distributed.worker - INFO - -------------------------------------------------
2024-08-05 17:29:27,958 - distributed.core - INFO - Starting established connection to tcp://dask-scheduler:8786
2024-08-05 17:30:34,090 - distributed.worker - INFO - Starting Worker plugin app/config/config.py81411df7-5fa4-4f68-86ec-ae34c5e593d3
2024-08-05 17:30:34,091 - distributed.utils - INFO - Reload module config from .py file
2024-08-05 17:30:34,133 - distributed.worker - INFO - Starting Worker plugin app/container.py88ae9bf4-d4f6-4003-9faf-03e6f1bd86d3
2024-08-05 17:30:34,135 - distributed.utils - INFO - Reload module container from .py file
2024-08-05 17:30:34,384 - distributed.worker - INFO - Starting Worker plugin app/dask_read.pyab989f56-e357-4449-8745-a043852e9ecc
2024-08-05 17:30:34,385 - distributed.utils - INFO - Reload module dask_read from .py file
2024-08-05 17:30:34,618 - distributed.worker - INFO - Starting Worker plugin app/init.py81919938-ace9-49ac-ab6f-922cadb16955
2024-08-05 17:30:34,619 - distributed.utils - INFO - Reload module init from .py file
2024-08-05 17:30:48,034 - distributed.worker - INFO - Starting Worker plugin app/config/config.py28270b45-f11b-4568-942a-55c6c73407e0
2024-08-05 17:30:48,035 - distributed.utils - INFO - Reload module config from .py file
2024-08-05 17:30:48,046 - distributed.worker - INFO - Starting Worker plugin app/container.py89a9795e-e534-4453-b1d4-461c4303e305
2024-08-05 17:30:48,047 - distributed.utils - INFO - Reload module container from .py file
2024-08-05 17:30:48,059 - distributed.worker - INFO - Starting Worker plugin app/dask_read.py8ae6ad67-dd2a-45d0-9d21-1f94dfe79001
2024-08-05 17:30:48,060 - distributed.utils - INFO - Reload module dask_read from .py file
2024-08-05 17:30:48,065 - distributed.worker - INFO - Starting Worker plugin app/init.py466d343c-7fdc-4fae-9abd-096bb7502916
2024-08-05 17:30:48,067 - distributed.utils - INFO - Reload module init from .py file
2024-08-05 17:31:09,041 - distributed.core - INFO - Event loop was unresponsive in Worker for 3.51s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
From Scheduler node:
2024-08-05 17:46:57,999 - distributed.scheduler - INFO - Registering Worker plugin app/init.py127fed45-8a2b-473a-92b8-48b9683c5312
2024-08-05 17:46:58,013 - distributed.scheduler - WARNING - Detected different run_spec for key ‘read-8f14e7d143bb2275a813c2c482fac753’ between two consecutive calls to update_graph. This can cause failures and deadlocks down the line. Please ensure unique key names. If you are using a standard dask collections, consider releasing all the data before resubmitting another computation. More details and help can be found at Reuse of keys in blockwise fusion can cause spurious KeyErrors on distributed cluster · Issue #9888 · dask/dask · GitHub.
Debugging information
old task state: memory
old run_spec: (<function read at 0x7fdda4f6d1b0>, (‘, ‘2023’, ‘10’), {})
new run_spec: (<function read at 0x7fdda59f5360>, (‘2023’, ‘10’), {})
old token: (‘tuple’, [(‘6e0ed9c6b8577997f345bc2ed1167f27760111b2’, ), (‘tuple’, [’‘’]), (‘dict’, )])
new token: (‘tuple’, [(‘d030f09aa0a5209816e47fe0d539f005066e6980’, ), (‘tuple’, [‘04’]), (‘dict’, )])
old dependencies: set()
new dependencies: set()
2024-08-05 17:47:03,530 - distributed.core - INFO - Connection to tcp://174 has been closed.
2024-08-05 17:47:03,530 - distributed.scheduler - INFO - Remove client Client-b61a48d1-5352-11ef-800f-e6f25a5c86cc
2024-08-05 17:47:03,540 - distributed.core - INFO - Connection to tcp://16624 has been closed.
2024-08-05 17:47:03,540 - distributed.scheduler - INFO - Remove client Client-88264982-5350-11ef-800f-e6f25a5c86cc
2024-08-05 17:47:03,541 - distributed.scheduler - INFO - Close client connection: Client-b61a48d1-5352-11ef-800f-e6f25a5c86cc
2024-08-05 17:47:03,541 - distributed.scheduler - INFO - Close client connection: Client-88264982-5350-11ef-800f-e6f25a5c86cc
Rows are pretty less (around 260 with multiple row groups), but the size per row is comparatively high. Hardware details are also available in the logs above.
Also i am using a scheduler and worker configurations with a custom image with the following dependencies. Not sure whether it creates any problem.
This is not really good, but 3.5s is small enough and should not be a problem.
You have a lot of Worker plugins, for what purpose?
This is really strange too…
Only 260 rows? So you just launch one delayed Task per row? But his means you are sending big object through network connection?
I don’t see anything about the hardware. What are the VMs flavour in the Kube Cluster ? From where do you launch your computation? Inside Kube, local Client? How many Worker and worker size?