I tried to use Dask to process spatial data, and everything works fine locally with small dataset, until i ran bigger data set on k8s cluster, and the task hash join transfer was created inside Dask. In my high level graphs, i never find it with small dataset or run it locally. How can i get rid of this issue?
This trackback was found in my worker logs:
Blockquote
Traceback (most recent call last):
File “/usr/local/lib/python3.10/dist-packages/distributed/worker.py”, line 2294, in execute
result = await self.loop.run_in_executor(
File “/usr/local/lib/python3.10/dist-packages/loky/process_executor.py”, line 370, in _sendback_result
result_queue.put(
File “/usr/local/lib/python3.10/dist-packages/loky/backend/queues.py”, line 230, in put
obj = dumps(obj, reducers=self._reducers)
File “/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py”, line 215, in dumps
dump(obj, buf, reducers=reducers, protocol=protocol)
File “/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py”, line 208, in dump
_LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
File “/usr/local/lib/python3.10/dist-packages/cloudpickle/cloudpickle.py”, line 1245, in dump
return super().dump(obj)
TypeError: cannot pickle ‘_contextvars.ContextVar’ objectEException during execution of task (‘hash-join-transfer-ba68a5e8de7d48b2e6697dd2a27dd41e’, 2)
I just saw the error logs showed the scheduler died, but still don’t know why force closing all workers will cause this error?
Also my all computation results suppose are on workers after dask read_parquet, and also my only compute is the below snippet. i believe only boolean value returned. Any idea what causes signal 15 on scheduler?
Blockquote
def save(gdf, layer, path):
…
return True
dfs = features.to_delayed()
writes = [delayed(save)(df, layer, path) for df in dfs]
for write in writes:
dd.compute(write)
Blockquote
Scheduler closing due to signal-15…
Scheduler closing all comms
…
Lost all workers
Stopped scheduler at ‘tcp://10.244.24.152:8786’
End scheduler
Stopping worker at tcp://10.244.178.2:36215. Reason: scheduler-close
Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File “/usr/local/lib/python3.10/dist-packages/distributed/core.py”, line 1558, in _connect
async def _connect(self, addr: str, timeout: float | None = None) → Comm:
asyncio.exceptions.CancelledError
…
For the first one, you probably use some dependency that cannot be serialized (pickled), which might not appear with a Local threaded scheduler, but come up with a distributed one. Also, it seems you are using loky on top of Dask?
For the second part, it seems somehow something is killing the Scheduler, but this is probably not due to your code, hard to tell with the current explanation. How do you create the K8S cluster? On what instances?
Also just a quick improvement on your code, for computing all the delayed, you should use:
Hi @guillaumeeb,
Yes, i did use loky. At beginning, i tried to investigate what causes killed worker, and i saw the guide from the page Plugins — Dask.distributed 2024.3.1 documentation, and also saw loky is recommended for process executors. i used them both. Should i use nanny ? What is best practice to setup the scheduler and workers for processing data?
I also found when i used nanny, i can observe high unmanaged memory, but i didn’t see it with loky. I wonder which results are correct?
I created the k8s by using kubectl to apply scheduler and worker configurations on Azure, e.g.
kubectl apply -f k8s-cluster.yaml
after cluster is created, i start a k8s job to connect to the cluster. Because at beginning i tried to create the cluster directly by using dask operator calls in the script, but it always showed me “403 forbidden” or other error. I think perhaps, i haven’t setup the service account for querying dask operator.
Yes, i did use "dd.compute(writes)" before, but i want to check if something caused killed workers
I also wonder if i can do something,like this. Will it still remain them two different delayed ?
dfs1, dfs2 = dd.compute([dfs1, dfs2])