Occasional task transition priority assertion error

Hi, I’m trying to track down the source of an error that we have semi-frequently on our dask cluster. We’ll submit a custom function to client.submit or an iterable and a custom map function to client.map (I’ve included an example of our usage below). We do not set the priority of our submitted functions at any point, although we do make heavy use of tasks submitting tasks via get_client on the worker nodes. The error is not precipitated by any other scheduler logs, worker errors, or user code exception that I can observe, the error is isolated to the scheduler callstack. I’m happy to enable any additional logging or diagnostics that might help isolated the issue.

Dask version: 2024.4.1
Python version: 3.10.14

Usage:

client = get_client()
d_outs = client.map(
    func,
    iterables,
    **kwargs,
    pure=False,
)
fire_and_forget(d_outs)

and

client = get_client()
d_out = client.submit(rfunc, func_key, *fargs, **fkwargs, pure=False)
fire_and_forget(d_out)

This is generic code that all of our task submission functions flow through, so we blanket mark them as impure (since some of them rely on external mutable data sources) and use fire_and_forget for the cases where functions are only used for their side effects and no consumer cares about the outcome.

Here’s the two stack traces that occur (I’m including both since they tend to show up together when this occurs):

2024-04-09 14:12:38,913 - distributed.scheduler - ERROR - Error transitioning 'r_my_function-41a133e5-ea79-4ce0-86b4-e8b3a4241ea3-3' from 'waiting' to 'processing'
Traceback (most recent call last):
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 1958, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 2337, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 3257, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 3429, in _task_to_msg
    assert ts.priority, ts
AssertionError: <TaskState 'r_my_function-41a133e5-ea79-4ce0-86b4-e8b3a4241ea3-3' processing>

and

2024-04-09 14:12:38,917 - distributed.scheduler - ERROR - <TaskState 'r_my_function-41a133e5-ea79-4ce0-86b4-e8b3a4241ea3-3' processing>
Traceback (most recent call last):
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/utils.py", line 832, in wrapper
    return await func(*args, **kwargs)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 4719, in update_graph
    self._create_taskstate_from_graph(
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 4657, in _create_taskstate_from_graph
    self.transitions(recommendations, stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 7984, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 2065, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 1958, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 2337, in _transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 3257, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 3429, in _task_to_msg
    assert ts.priority, ts
AssertionError: <TaskState 'r_my_function-41a133e5-ea79-4ce0-86b4-e8b3a4241ea3-3' processing>

I’ll also mention that this error seems to be occurring much more frequently on the move from version 2023.9.3 → 2024.4.1

Hi @lucasgodshalk, welcome to Dask community,

I have to admit that I won’t be able to help you here, this seems to be a complex scheduling problem, probably related to launching tasks from tasks.

cc @crusaderky @fjetter who might have some ideas.

Thanks @guillaumeeb, do you think I should go ahead and make a github issue? I initially didn’t want to post there until I could figure out replication steps.

Well, I think it would be much better to have at least a small reproducer, even if it does not always reproduce the issue…

Just a minor update, I realized there were a set of logs that were all the same stack trace for the exception. Here’s the full stack trace:

2024-04-22 05:54:30,594 - distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/core.py", line 921, in _handle_comm
    result = await result
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 5518, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/core.py", line 1000, in handle_stream
    await handler(**merge(extra, msg))
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/utils.py", line 801, in wrapper
    return await func(*args, **kwargs)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 4637, in update_graph
    self._create_taskstate_from_graph(
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 4578, in _create_taskstate_from_graph
    self.transitions(recommendations, stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 7620, in transitions
    self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 2038, in _transitions
    new_recs, new_cmsgs, new_wmsgs = self._transition(key, finish, stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 1932, in _transition
    recommendations, client_msgs, worker_msgs = func(
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 2305, in transition_waiting_processing
    return self._add_to_processing(ts, ws, stimulus_id=stimulus_id)
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 3207, in _add_to_processing
    return {}, {}, {ws.address: [self._task_to_msg(ts)]}
  File "/pearlstreet/pst-compute/.venv/lib/python3.10/site-packages/distributed/scheduler.py", line 3374, in _task_to_msg
    assert ts.priority, ts
AssertionError: <TaskState '_r_my_func-0da39ce5-0abd-4e2f-b7b6-db120f768fa8' processing>