Dask tests randomly fail with "Heartbeat to scheduler failed" message

Problem:
I have refactored my test to use gen_cluster for generating local Dask clusters. A subset of them are failing inconsistently, typically with the message “Heartbeat to scheduler failed”. I think it’s a race condition across tests. The same three seem to fail, but inconsistently, and can be stepped through with the debugger without error until the tests finishes, then some error is thrown.

Primer:
Hidebound is a file-based, ephemeral database built on Dash, Pandas and Dask. Orginally, it was built on Pandas and fully tested and functional. I am refactoring it to use Dask. The entire app can be deployed as a docker and container and the tests run via the CLI (bin/hidebound) within the repo.
Install instructions: Introduction — hidebound documentation
Test command: bin/hidebound test or if using VSCode Run Task > test.
- this command will build the container and start it first if it does not exist

Unfortunately, I don’t think there is a simpler process than building the app and running the tests to reproduce the errors, since most of my test past consistently. Fortunately, the CLI makes that exceedingly easy.

Example:

from distributed.utils_test import gen_cluster
import pytest

from hidebound.core.database import Database
import hidebound.core.tools as hbt
# make_dirs, make_files, specs are custom conftest.py funcs

DASK_WORKERS = 2
GEN_KWARGS = dict(
    nthreads=[('127.0.0.1', i) for i in range(1, DASK_WORKERS + 1)],
    cluster_dump_directory='/tmp/cluster_dump',
)

@gen_cluster(**GEN_KWARGS)
async def test_search(scheduler, *workers, make_dirs, make_files, specs):
    Spec001, Spec002, _ = specs
    ingress, staging, _ = make_dirs

    db = Database(
        ingress, staging, [Spec001, Spec002], dask_workers=DASK_WORKERS
    )
    db.update()
    db.search('SELECT * FROM data WHERE version == 1')
    # debugger will make it here without issue

Truncated traceback:

ValueError: Must have equal len keys and value when setting with an iterable
scheduler = <Scheduler 'tcp://127.0.0.1:35927', workers: 0, cores: 0, tasks: 0>
make_dirs = ('/tmp/tmp1mydc98x/ingress', '/tmp/tmp1mydc98x/hidebound', '/tmp/tmp1mydc98x/archive')
make_files = ['/tmp/tmp1mydc98x/ingress/proj001/spec001/pizza/p-proj001_s-spec001_d-pizza_v001/p-proj001_s-spec001_d-pizza_v001_c00...ess/proj001/spec001/pizza/p-proj001_s-spec001_d-pizza_v002/p-proj001_s-spec001_d-pizza_v002_c0000-0000_f0003.png', ...]
specs = (<class 'conftest.Spec001'>, <class 'conftest.Spec002'>, <class 'conftest.BadSpec'>)
workers = (<Worker 'tcp://127.0.0.1:45181', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>, <Worker 'tcp://127.0.0.1:41433', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>)

    @pytest.mark.flakey
    @gen_cluster(**GEN_KWARGS)
    async def test_search(scheduler, *workers, make_dirs, make_files, specs):
        Spec001, Spec002, _ = specs
        ingress, staging, _ = make_dirs
    
        db = Database(
            ingress, staging, [Spec001, Spec002], dask_workers=DASK_WORKERS
        )
        db.update()
        db.search('SELECT * FROM data WHERE version == 1')
       # debugger will make to this point no problem

python/hidebound/core/database_test.py:750: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
python/hidebound/core/database.py:547: in search
    {'data': self.read(group_by_asset=group_by_asset)}
python/hidebound/core/database.py:353: in read
    data.loc[mask, col] = traits.loc[mask, col]
../.local/lib/python3.7/site-packages/pandas/core/indexing.py:723: in __setitem__
    iloc._setitem_with_indexer(indexer, value, self.name)
../.local/lib/python3.7/site-packages/pandas/core/indexing.py:1730: in _setitem_with_indexer
    self._setitem_with_indexer_split_path(indexer, value, name)

...

Message: 'Heartbeat to scheduler failed'

The tests can be found here:

test_create fails consistently, when I comment it out test_create_all_invalid fails instead. This suggests that the failure is a race condition to do with whatever test finishes last.

Switching to this pattern of test made them run all slower, but reliably. My guess is that they are being run in serial.

from distributed.utils_test import client as dask_client
from distributed.utils_test import loop, cluster_fixture, cluster


def test(dask_client): 
    ...

Unfortunately, this still does not solve the problem.