Can't pickle _contextvars.ContextVar during execution of task hash-join-transfer

Hi,

I tried to use Dask to process spatial data, and everything works fine locally with small dataset, until i ran bigger data set on k8s cluster, and the task hash join transfer was created inside Dask. In my high level graphs, i never find it with small dataset or run it locally. How can i get rid of this issue?

This trackback was found in my worker logs:

Blockquote

Traceback (most recent call last):
File “/usr/local/lib/python3.10/dist-packages/distributed/worker.py”, line 2294, in execute
result = await self.loop.run_in_executor(
File “/usr/local/lib/python3.10/dist-packages/loky/process_executor.py”, line 370, in _sendback_result
result_queue.put(
File “/usr/local/lib/python3.10/dist-packages/loky/backend/queues.py”, line 230, in put
obj = dumps(obj, reducers=self._reducers)
File “/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py”, line 215, in dumps
dump(obj, buf, reducers=reducers, protocol=protocol)
File “/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py”, line 208, in dump
_LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
File “/usr/local/lib/python3.10/dist-packages/cloudpickle/cloudpickle.py”, line 1245, in dump
return super().dump(obj)
TypeError: cannot pickle ‘_contextvars.ContextVar’ objectEException during execution of task (‘hash-join-transfer-ba68a5e8de7d48b2e6697dd2a27dd41e’, 2)

Blockquote

My packages:
toolz 0.12.1
dask 2024.2.1
dask-geopandas 0.3.1
dask-kubernetes 2024.3.1
loky 3.4.1

I just saw the error logs showed the scheduler died, but still don’t know why force closing all workers will cause this error?

Also my all computation results suppose are on workers after dask read_parquet, and also my only compute is the below snippet. i believe only boolean value returned. Any idea what causes signal 15 on scheduler?

Blockquote
def save(gdf, layer, path):

return True
dfs = features.to_delayed()
writes = [delayed(save)(df, layer, path) for df in dfs]
for write in writes:
dd.compute(write)

Blockquote
Scheduler closing due to signal-15…
Scheduler closing all comms

Lost all workers
Stopped scheduler at ‘tcp://10.244.24.152:8786’
End scheduler
Stopping worker at tcp://10.244.178.2:36215. Reason: scheduler-close
Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File “/usr/local/lib/python3.10/dist-packages/distributed/core.py”, line 1558, in _connect
async def _connect(self, addr: str, timeout: float | None = None) → Comm:
asyncio.exceptions.CancelledError

Hi @Sam, welcome to Dask community!

It seems you have two different problems here.

For the first one, you probably use some dependency that cannot be serialized (pickled), which might not appear with a Local threaded scheduler, but come up with a distributed one. Also, it seems you are using loky on top of Dask?

For the second part, it seems somehow something is killing the Scheduler, but this is probably not due to your code, hard to tell with the current explanation. How do you create the K8S cluster? On what instances?

Also just a quick improvement on your code, for computing all the delayed, you should use:

dd.compute(*write)

instead of a for loop.

Hi @guillaumeeb,
Yes, i did use loky. At beginning, i tried to investigate what causes killed worker, and i saw the guide from the page Plugins — Dask.distributed 2024.3.1 documentation, and also saw loky is recommended for process executors. i used them both. Should i use nanny ? What is best practice to setup the scheduler and workers for processing data?

I also found when i used nanny, i can observe high unmanaged memory, but i didn’t see it with loky. I wonder which results are correct?

I created the k8s by using kubectl to apply scheduler and worker configurations on Azure, e.g.
kubectl apply -f k8s-cluster.yaml

after cluster is created, i start a k8s job to connect to the cluster. Because at beginning i tried to create the cluster directly by using dask operator calls in the script, but it always showed me “403 forbidden” or other error. I think perhaps, i haven’t setup the service account for querying dask operator.

Yes, i did use "dd.compute(writes)" before, but i want to check if something caused killed workers
I also wonder if i can do something,like this. Will it still remain them two different delayed ?
dfs1, dfs2 = dd.compute(
[dfs1, dfs2])

Sam

I think you shouldn’ try to use loky if not necessary for you, just stick with Dask default mechanisms for now.

I’m not sure of the memory tracking system when using loky, but using defaut Nanny, this should be correct and might be the cause of your problem.

Yes it should.