How to make the worker fail the computation when memory limit is reached?

Currently when certain memory limit is reach, worker nanny will restart and dump all existing states and redo the jobs. The host program does not know about this fact and will just wait.

My goal is to find a way to let the host program know about this fact(e.g. an exception) so that it can move on.

@ubw218 Thanks for this question! Could you please share the error traceback? And, if possible, a minimal reproducible example? It’ll allow us to help you better!

1 Like

What you’re looking for is the distributed.scheduler.allowed-failures config variable.
May I ask why you don’t want resilience to OOM? Is it to speed up the development process, or is it something that’s desirable in production? In the latter case, why?

1 Like

Thanks @crusaderky!
It is more of the latter. In our “production” environment, different users will be submitting jobs to the same pipeline. If some of the job is unreasonably big, we only want to run it once and know “OK, our resource is not enough to run that anyways” and we are OK to let that job fail and give the resource to other users’ jobs.

I did find distributed.scheduler.allowed-failures whose default is set to 3. But my following experiment shows the number is kind of random(6 times, 8 times, 10 times) before the scheduler gave up. I will post the code shortly

In terminal:

dask-scheduler

followed by

dask-worker tcp://127.0.0.1:8786 --nprocs 1 --nthreads 1 --memory-limit=200MiB

^Gave the worker a tiny bit of memory to reproduce the issue easier.

The python code

import numpy as np
import pandas as pd
from dask import delayed
from distributed import Client


def f():
    print("running f")
    print("-----")
    print("-----")
    print("-----")
    print("-----")
    df = pd.DataFrame(dict(row_id=np.zeros(10000000)))
    return df


def main():
    with Client(address='tcp://127.0.0.1:8786') as client:
        d_object = delayed(f)()
        print(d_object.compute(scheduler=client))


if __name__ == "__main__":
    main()

The output prints

distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:62835',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63054',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63058',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63062',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63066',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63073',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63077',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63081',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63085',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63089',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f-29283ad1-da89-42db-b065-0bc69f0e7cf7': ('tcp://127.0.0.1:63098',)}