Hey, there. I am new to Dask, but successfully set up dask/dask on a kubernetes cluster with helm.
I have been running into an issue where workers die when I apply client.map(f, files)
. The files are large (about 1gb) and the function f loads and pushes to some zarr store.
I watch the dashboard and notice that when len(files) grows, the number of tasks per worker increases, memory approaches the limit specified in values.yaml resources for helm install, the worker dies and things fail out.
So, I thought it might be a good idea to specify resources for a task with something like:
futures = client.map(f, files, resources={'MEMORY': 2e9})
but the tasks never submits and the dashboard recognizes no new tasks. Am I missing something?
How does dask know to stop submitting tasks to workers if it will cause a worker to use more memory than allows? Is this a nuance of client.map
? Should I use dask.delayed
or client.submit
instead?
I can avoid this by running submit
and get
over batches to make sure no worker gets more than k
tasks, but I feel like I am inventing the wheel hereā¦