As_completed can run into infinite loop

A follow-up test after How to make the worker fail the computation when memory limit is reached?.

I disabled failed job retries and just run the following toy program on 1 worker/1 thread/180MiB RAM. I found quite often the program will stuck in the as_completed. __next__() because itself is never set to empty. I’ll post all the stacktrace and screenshots at the end.

I need to point out that the issue is timing related. About half of the times, the program can exit with the expected exception.

dask version is 2021.12.0

Toy program

import numpy as np
import pandas as pd
from dask import delayed
from distributed import Client, futures_of, as_completed


@delayed
def f1():
    print("running f1")
    df = pd.DataFrame(dict(row_id=np.zeros(10000000)))
    return df


def main():
    with Client(address='tcp://127.0.0.1:8786') as client:
        futures = futures_of(client.persist(f1()))
        for future, result in as_completed(futures, loop=client.loop, with_results=True):
            print(future)


if __name__ == "__main__":
    main()

Program log:

distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f1-13a8f680-91b5-447e-8bca-d271f5a06562': ('tcp://127.0.0.1:49938',)}
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'f1-13a8f680-91b5-447e-8bca-d271f5a06562': ('tcp://127.0.0.1:49947',)}
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x142c9c310>>, <Task finished name='Task-13' coro=<as_completed._track_future() done, defined at /Users/my_name/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py:4566> exception=KilledWorker('f1-13a8f680-91b5-447e-8bca-d271f5a06562', <WorkerState 'tcp://127.0.0.1:49951', name: tcp://127.0.0.1:49951, status: closed, memory: 0, processing: 1>)>)
Traceback (most recent call last):
  File "/Users/my_name/virtual_env/my_project/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/Users/my_name/virtual_env/my_project/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/Users/my_name/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py", line 4573, in _track_future
    result = await future._result(raiseit=False)
  File "/Users/my_name/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py", line 260, in _result
    result = await self.client._gather([self])
  File "/Users/my_name/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py", line 1811, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ('f1-13a8f680-91b5-447e-8bca-d271f5a06562', <WorkerState 'tcp://127.0.0.1:49951', name: tcp://127.0.0.1:49951, status: closed, memory: 0, processing: 1>)

After sending keyboard interrupt:

Traceback (most recent call last):
  File "/Users/my_name/git/my_project/toy11.py", line 22, in <module>
    main()
  File "/Users/my_name/git/my_project/toy11.py", line 17, in main
    for future, result in as_completed(futures, loop=client.loop, with_results=True):
  File "/Users/my_name/virtual_env/my_project/lib/python3.8/site-packages/distributed/client.py", line 4653, in __next__
    self.thread_condition.wait(timeout=0.100)
  File "/Users/my_name/.pyenv/versions/3.8.9/lib/python3.8/threading.py", line 306, in wait
    gotit = waiter.acquire(True, timeout)
KeyboardInterrupt

scheduler log:

distributed.scheduler - INFO - Receive client connection: Client-3fcb1c22-b513-11ec-a383-acde48001122
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:49938', name: tcp://127.0.0.1:49938, status: paused, memory: 1, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:49938
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - ERROR - Couldn't gather keys {'f1-13a8f680-91b5-447e-8bca-d271f5a06562': ['tcp://127.0.0.1:49938']} state: ['no-worker'] workers: ['tcp://127.0.0.1:49938']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:49938'], f1-13a8f680-91b5-447e-8bca-d271f5a06562
NoneType: None
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:49947', name: tcp://127.0.0.1:49947, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:49947
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:49947', name: tcp://127.0.0.1:49947, status: running, memory: 1, processing: 0>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:49947
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - ERROR - Couldn't gather keys {'f1-13a8f680-91b5-447e-8bca-d271f5a06562': ['tcp://127.0.0.1:49947']} state: ['no-worker'] workers: ['tcp://127.0.0.1:49947']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:49947'], f1-13a8f680-91b5-447e-8bca-d271f5a06562
NoneType: None
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:49951', name: tcp://127.0.0.1:49951, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:49951
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://127.0.0.1:49951', name: tcp://127.0.0.1:49951, status: running, memory: 0, processing: 1>
distributed.core - INFO - Removing comms to tcp://127.0.0.1:49951
distributed.scheduler - INFO - Task f1-13a8f680-91b5-447e-8bca-d271f5a06562 marked as failed because 0 workers died while trying to run it
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Register worker <WorkerState 'tcp://127.0.0.1:49954', name: tcp://127.0.0.1:49954, status: undefined, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:49954
distributed.core - INFO - Starting established connection

Worker log

running f1
distributed.worker - WARNING - Worker is at 120% memory usage. Pausing worker.  Process memory: 217.52 MiB -- Worker memory limit: 180.00 MiB
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 217.52 MiB -- Worker memory limit: 180.00 MiB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - INFO - Worker process 25469 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:49947
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:49947
distributed.worker - INFO -          dashboard at:            127.0.0.1:49948
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                 180.00 MiB
distributed.worker - INFO -       Local Directory: /Users/my_name/git/my_proj/dask-worker-space/worker-8gvej_no
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
running f1
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - INFO - Worker process 25477 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:49951
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:49951
distributed.worker - INFO -          dashboard at:            127.0.0.1:49952
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                 180.00 MiB
distributed.worker - INFO -       Local Directory: /Users/my_name/git/my_proj/dask-worker-space/worker-upiseycf
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
running f1
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - INFO - Worker process 25479 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:49954
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:49954
distributed.worker - INFO -          dashboard at:            127.0.0.1:49955
distributed.worker - INFO - Waiting to connect to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                 180.00 MiB
distributed.worker - INFO -       Local Directory: /Users/my_name/git/my_proj/dask-worker-space/worker-sb_vegpj
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:       tcp://127.0.0.1:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection