Random FutureCancelledError() with unknown cause inside computes

I added docling as a part of dask pipeline recently. Docling is a document processor with it’s own thread pool which might be causing some contention with dask. I limit it to 1/5th of available cores and in general aside from this heavy multithreaded task load on workers is not too high.

My jobs started failing randomly with FutureCancelledError() without clear cause. Client side tracebacks:

[2025-01-21 20:50:02,698 ERROR service] worker 2: handling event FileCreate(timestamp=1737492574, service='file_processor', customer_id=ObjectId('6660a87e4f67b21052795d67'), ki_id=ObjectId('675c4430c2f1f87f8f1050da'), ki_media='file', kind='reuse_created'): docling_extract_and_split-fe71a502-edea-41cb-b7c8-9ce46137d437 cancelled for reason: unknown.
Traceback (most recent call last):
  File "/backend/vascular/bin.runfiles/_main/backend/vascular/src/service.py", line 86, in run
    await self.process_event(event)
  File "/backend/vascular/bin.runfiles/_main/backend/vascular/src/service.py", line 119, in process_event
    await future
  File "/backend/vascular/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0c75888b/site-packages/distributed/client.py", line 409, in _result
    raise exc.with_traceback(tb)
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/event_defs.py", line 103, in handle
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 106, in file_create
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 209, in process_file
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 376, in compute
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 664, in compute
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0c75888b/site-packages/distributed/client.py", line 2414, in _gather
distributed.client.FutureCancelledError: docling_extract_and_split-fe71a502-edea-41cb-b7c8-9ce46137d437 cancelled for reason: unknown.
[2025-01-21 20:50:24,355 ERROR service] worker 3: handling event FileCreate(timestamp=1737492575, service='file_processor', customer_id=ObjectId('6660a87e4f67b21052795d67'), ki_id=ObjectId('6790085f9828b5848c8a061b'), ki_media='file', kind='upload_created'): write_data_to_mongo-46e60f1a-5a0a-4ead-abb5-b5329740ef53 cancelled for reason: unknown.
Traceback (most recent call last):
  File "/backend/vascular/bin.runfiles/_main/backend/vascular/src/service.py", line 86, in run
    await self.process_event(event)
  File "/backend/vascular/bin.runfiles/_main/backend/vascular/src/service.py", line 119, in process_event
    await future
  File "/backend/vascular/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0c75888b/site-packages/distributed/client.py", line 409, in _result
    raise exc.with_traceback(tb)
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/event_defs.py", line 103, in handle
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 106, in file_create
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 237, in process_file
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 376, in compute
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 664, in compute
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0c75888b/site-packages/distributed/client.py", line 2414, in _gather
distributed.client.FutureCancelledError: write_data_to_mongo-46e60f1a-5a0a-4ead-abb5-b5329740ef53 cancelled for reason: unknown.

Note that these are 2 different tasks. Second runs after docling.

Worker side failures look this way (no other indications of problem other than these messages in the middle of normal app logs):

[2025-01-21 20:52:33,586 INFO  worker] starting task ask_vllm
[2025-01-21 20:52:33,589 DEBUG rs_appcore.redis_lock] priority 0: using single lock
[2025-01-21 20:52:33,590 DEBUG rs_appcore.redis_lock] locking s1_lock:vllm_completions:2/6790091136f4bd8193638c56
2025-01-21 20:52:33,598 - distributed.worker - ERROR - Compute Failed
Key:       handle-5ec95939-9d5a-4ea2-9c05-ab8f4d944a27
State:     long-running
Function:  handle
args:      ()
kwargs:    {}
Exception: 'FutureCancelledError()'
Traceback: '  File "/backend/dask/bin.runfiles/_main/backend/dask/src/event_defs.py", line 103, in handle\n    file_create(self)\n  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 106, in file_create\n    ki, status = process_file(event.customer_id, event.ki_id)\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 237, in process_file\n    ).compute()\n      ^^^^^^^^^\n  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 376, in compute\n    (result,) = compute(self, traverse=False, **kwargs)\n                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 664, in compute\n    results = schedule(dsk, keys, **kwargs)\n              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0c75888b/site-packages/distributed/client.py", line 2414, in _gather\n    raise exception.with_traceback(traceback)\n'

[2025-01-21 20:52:33,634 INFO  worker] starting task set_ki_status
[2025-01-21 20:52:33,635 DEBUG rs_vllm.completions] sending completions request temp=None seed=210990856

And finally human readable tracebacks from dashboard:

  File "/backend/dask/bin.runfiles/_main/backend/dask/src/event_defs.py", line 103, in handle
    file_create(self)
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 106, in file_create
    ki, status = process_file(event.customer_id, event.ki_id)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 237, in process_file
    ).compute()
      ^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 376, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 664, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0c75888b/site-packages/distributed/client.py", line 2414, in _gather
    raise exception.with_traceback(traceback)
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/event_defs.py", line 103, in handle
    file_create(self)
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 106, in file_create
    ki, status = process_file(event.customer_id, event.ki_id)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 209, in process_file
    text, segments = text_extractor.extract_and_split_text().compute()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 376, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_484c317e/site-packages/dask/base.py", line 664, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0c75888b/site-packages/distributed/client.py", line 2414, in _gather
    raise exception.with_traceback(traceback)

All these tracebacks don’t touch app code and only point to compute() calls.
Scheduler does not mention anything about this. The only suspicious thing is a single message about GIL lock for a few seconds but failures continue happening after that.

These errors happen early during processing so they don’t look like timeouts. ~7% of pipelines are affected by this.

Are there any obvious steps i could take to debug this? Worker debug logs add nothing but heartbeats.
All dask/distributed issues i could find about FutureCancelled have reason but in my case its unknown

dask/distributed: 2025.1.0
env: k8s cluster, custom docker image for worker and scheduler

I think i’ve read somewhere you shouldn’t be sending Delayed to submit and found 1 such piece of code. I tried both adding compute()s and completely removing it but i still see a couple FutureCancelledError()s in exact same places:

    with worker_client() as client:
        future = client.submit(
            generate_thumbnails,
            customer_id,
            ki_id,
            file_type.ct_label.compute(),  # added compute() on Delayed object here
            file_bytes.compute(),          # added compute() on Delayed object here
        )
        distributed.fire_and_forget(future)

The only other place using submit in pipeline does not use fire_and_forget and does not have Delayed input objects:

    with worker_client() as client:
        converted = client.submit(
            docling_run_submitted_pipeline,
            pipeline,
            data,
            resources={"DOCLING": 1},
        ).result()

Edit: i removed both of them but nothing changed:

    generate_thumbnails(
        customer_id,
        ki_id,
        file_type.ct_label,
        file_bytes,
    ).compute()
    with annotate(resources={"DOCLING": 1}):
        converted = docling_run_submitted_pipeline(pipeline, data).compute()

another update: i don’t get any failures on single worker. it must have something to do with tasks travelling between workers

After replacing some code in pipeline with noop functions i’ve got a few new errors:

2025-01-23 18:03:34,970 - distributed.worker - ERROR - Compute Failed Key: handle-f50b15e0-8bde-4586-bfe2-636c710ad465 State: long-running Task: <Task 'handle-f50b15e0-8bde-4586-bfe2-636c710ad465' handle()> Exception: 'Exception("AssertionError(<TaskState \'docling_make_doc-ee44052f-a624-448c-80c2-ee76ab50e39d\' processing>)")' Traceback: ' File "/backend/dask/bin.runfiles/_main/backend/dask/src/event_defs.py", line 103, in handle\n file_create(self)\n File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 106, in file_create\n _ki, status = process_file(event.customer_id, event.ki_id)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 254, in process_file\n text, segments = text_extractor.extract_and_split_text().compute()\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_db86220c/site-packages/dask/base.py", line 374, in compute\n (result,) = compute(self, traverse=False, **kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_db86220c/site-packages/dask/base.py", line 662, in compute\n results = schedule(dsk, keys, **kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/client.py", line 2426, in _gather\n raise exception.with_traceback(traceback)\n' 

Traceback looks the same as before though:

  File "/backend/dask/bin.runfiles/_main/backend/dask/src/event_defs.py", line 103, in handle
    file_create(self)
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 106, in file_create
    _ki, status = process_file(event.customer_id, event.ki_id)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/_main/backend/dask/src/events/file.py", line 254, in process_file
    text, segments = text_extractor.extract_and_split_text().compute()
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_db86220c/site-packages/dask/base.py", line 374, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_dask_py3_none_any_db86220c/site-packages/dask/base.py", line 662, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/client.py", line 2426, in _gather
    raise exception.with_traceback(traceback)

And on scheduler it looks like:

2025-01-23 18:01:56,088 - distributed.scheduler - INFO - User asked for computation on lost data, docling_extract_and_split-0eb4ee4d-992b-45b9-a0b0-36c5f1a8a84a
2025-01-23 18:02:02,797 - distributed.worker - INFO - Run out-of-band function 'dask_health'
2025-01-23 18:02:02,799 - distributed.worker - INFO - Run out-of-band function 'dask_health'
2025-01-23 18:02:04,609 - distributed.scheduler - ERROR - Error transitioning 'docling_make_doc-a878f563-bedc-428a-ba3e-fff44fd5a7f0' from 'waiting' to 'processing'
Traceback (most recent call last):
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 2007, in _transition
    recommendations, client_msgs, worker_msgs = func(
                                                ^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 2458, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 3407, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
                                 ^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 3579, in _task_to_msg
    assert ts.priority, ts
           ^^^^^^^^^^^
AssertionError: <TaskState 'docling_make_doc-a878f563-bedc-428a-ba3e-fff44fd5a7f0' processing>
2025-01-23 18:02:04,617 - distributed.scheduler - ERROR - <TaskState 'docling_make_doc-a878f563-bedc-428a-ba3e-fff44fd5a7f0' processing>
2025-01-23 18:02:04,629 - distributed.protocol.pickle - ERROR - Failed to serialize <TaskState 'docling_make_doc-a878f563-bedc-428a-ba3e-fff44fd5a7f0' processing>.
Traceback (most recent call last):
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 4911, in update_graph
    metrics = self._create_taskstate_from_graph(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 4788, in _create_taskstate_from_graph
    self.transitions(recommendations, stimulus_id)
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 8231, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 2124, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 2007, in _transition
    recommendations, client_msgs, worker_msgs = func(
                                                ^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 2458, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 3407, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
                                 ^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/scheduler.py", line 3579, in _task_to_msg
    assert ts.priority, ts
           ^^^^^^^^^^^
AssertionError: <TaskState 'docling_make_doc-a878f563-bedc-428a-ba3e-fff44fd5a7f0' processing>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_pickle.PicklingError: Can't pickle <function docling_extract_and_split at 0x7f8e507196c0>: it's not the same object as backend.dask.src.events.data_extractors.docling_extract_and_split

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
_pickle.PicklingError: Can't pickle <function docling_extract_and_split at 0x7f8e507196c0>: it's not the same object as backend.dask.src.events.data_extractors.docling_extract_and_split

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_distributed_py3_none_any_0b9c0ebc/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_cloudpickle_py3_none_any_c8c5a442/site-packages/cloudpickle/cloudpickle.py", line 1537, in dumps
    cp.dump(obj)
  File "/backend/dask/bin.runfiles/rules_python~~pip~pypi_312_cloudpickle_py3_none_any_c8c5a442/site-packages/cloudpickle/cloudpickle.py", line 1303, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
TypeError: cannot pickle 'weakref.ReferenceType' object

This looks like some race condition and not as a cause for original error though

I identified the problematic line of code. It wasnt any of external dependencies and not even docling thread pool which i removed during testing. The problem was somehow caused by .persist() call which was stored as class attribute and used a bit later. Replacing it with .compute() stopped all random failures.
Other persist / dask.persist code never did this.

I ended up restructuring code and reducing the amount of .compute() layers.