Workers get restarted but there are no error messages etc

I can’t figure out why the workers are getting restarted or how can I get this information? It doesn’t show the stderr or anything.

All configurations are on default. And the task I’m trying to do is a simple text classification.

Because of these workers are getting restarted, with dask.config.set(scheduler='single-threaded'): gets completed faster than distributed.

Also the restarts are unpredictable; sometimes, with the same task, workers get restarted x times, sometimtes y times, etc.

Hi @daskee,

With distributed, you should be able to view the Worker logs through the dashboard, do you have access to it? You should also view the error logs if the computation crash too many times and is interrupted, but this seems not to be the case.

The most common case of Worker restart is a memory problem.

Aside from taht, it would be much easier to help with some reproducer, do you think you could build one?

Hi @guillaumeeb ,

Yes I’m monitoring the workers’ logs from e.g. during training. But despite of the nanny says “restarted”, it’s not actually the same worker. The worker gets shut down and another worker spawns to replace it with different port and fresh logs. So as soon as the worker gets “restarted”, the URL for worker logs I just write becomes unreachable.

This is a worker’s logs during training:

I also monitor the workers from tab and the memory doesn’t seem to reach too high at all. You can see how they restarted (or “replaced”) here:

And a reproducer might be difficult to create because I read data from parquet which has text and label column. But the ending array is like this:

Hi, if I do the maths correctly, your output chunk sizes weight 45GiB! This is really huge and could clearly the cause of worker restarting.

Yes that’s a big sparse array, but:

  • Still, how could I see this without searching the problem, I mean there should be a log like " this worker has killed because it tried to allocate too much memory" or something?
  • If this is because of memory, how does it work with with dask.config.set(scheduler='single-threaded'): without any errors? I have 16G RAM
  • I tried:
x = x_train.rechunk(block_size_limit=100e6)
y = train['label'].to_dask_array(lengths=True)
y = y.rechunk((x.chunksize[0],))

does this make sense?

I guess it is crashing too fast while trying to allocate memory.

That is a really good question!

Yes it does, but ideally, you should configure appropriate chunk size when reading the data. How do you read input data in the first place?

The rechunk may arrive too late, after Dask tries to load a big chunk in memory before splitting it into smaller ones.

Hi, so sorry that I totally missed your comment.

I read data from partitioned parquet files. I repartitioned the data so that each part is 100 MB in size.

After transformation (preprocessing, vectorization etc.), it’s reasonable that the new vector array may have different sizes (in bytes) from the original files; it makes sense.

But the mystrey of my questions 1 and 2 still on :slight_smile:

What is the original partition size?

Yes but be careful, it must not be to big at this point!

The error might be triggered at operating system level, with an oom_killer mechanisms, leaving no chance to Dask to capture it.

Maybe it’s still a problem of chunk size, but less than the 45GiB I’m seeing above. Or it might mean it’s something totally different. The only way to know would be to have some reproducer with representative data sizes.