Scale>1 fails: "shut down workers that don't have promised keys"

Hi All,
I am new to dask, but I am getting the hang of it and I plan to continue to use it. I am using async SLURMCluster and I am experimenting with the adapt option. Everything works fine when I am using a single worker. However, when the scaling kicks in, it always fails with:

shut down workers that don't have promised keys and also distributed.scheduler.KilledWorker: ('build_multipolygon_from_raster_window-fdbad344-2072-4428-b0d5-561366e4e6e4', <WorkerState 'tcp://128.239.57.11:43980', name: SLURMCluster-0-4, ...

Anyone can chime in on why my workflow works for a single node but crashes when I request scaling > 1 or adapt maximum > 2 ?

Hi,

You tripped a failsafe in the cluster coherence checks. It’s something that should never happen.
Could you come up with a minimal reproducer and open a ticket on Issues · dask/distributed · GitHub?

Thanks

1 Like

Hi, thanks for your reply!

I will try to see what exact conditions if any makes this problem reproducible. This is unfortunately not the only issue I’ve run into with dask, so I am slightly overwhelmed with trying to figure out how to make it work. I did however start some discussion in this GitHub link. It may be the wrong place to post the ideas, but to be honest it’s a lot to digest for me at the moment. It looks like Dask is promising, but it might not support the full range of my needs, or I may just not know sufficiently about it.

Any advice on how to proceed will be greatly appreciated!

Hi @jreniel,

There’s a lot of questions in your post and GitHub link, I’ll try to give some answers.

First: do not use adapt mode unless you know perfectly what you’re doing. It’s a Wow effect feature, but it’s often not useful an complicates things.

But it seems by your GitHub comment (probably not at the right place) that you experience issues also by only using scaling.

So a second thing you could try: wait for some workers to be started before submitting your processing. There’s a method in Client for that if I recall (wait_for… I’d venture). Dask is sometimes a bit fragile with lots of workers coming in or out at the same time (but I agree it should be more robust, so if you have an reproducible example, go for it).

As for the rest, it seems you’re problem is kind of embarrassingly parallel, so it should work quite easy with Dask. Just commenting some of GitHub lines below.

uses a custom window generation, because the default dask-array blocks may be too small,

Dask array just try to find a good blocksize from your input data, depending on metadata, volume and other attributes. It is usually a very good thing to give it some hints if you now your data well.

when the list is very long, Nanny’s fail to start with time outs (semi-random)

Nanny failures often comes from too many workers starting at the same time. Again, maybe don’t use adapt, and try with scaling to a given amount of workers. The list length should only come into this with adapt, else list length and Nanny timeout are unrelated.

stuck in a loop going from 100% back to ~60%, rinse and repeat

An operation is failing, either because of adaptive killing workers, worker dying for some reason (often out of memory), or problem in your code.

In particular I need to submit a delayed function that uses multiprocessing.Pool internally, but that doesn’t work because I get an error: AssertionError: daemonic processes are not allowed to have children

I proposed a solution in another post. Additionally, did you have a look at the no-nanny flag? It might also fix your issue, losing a bit of reliability.

Best,
Guillaume.

1 Like

Hi @guillaumeeb,

Thanks for these pointers, these confirm some of my suspicions, and also will help me make better design decisions. These are very very useful comments, and I appreciate the time you took to address some of my questions.

1 Like