Using Pytest with LocalClusters

Hi,

I have a set of unit tests which internally create and tear down local clusters:

def test_one():
  with LocalCluster() as cluster:
    # do something

def test_two():
  with LocalCluster() as cluster:
    # do something

when running these tests sequentially, all tests pass. When running them in parallel (with -n 4 using pytest-xdist), I get the following unclosed resource (socket) error:

[gw0] darwin -- Python 3.10.5 /Users/pietergijsbers/repositories/gama/venv310/bin/python

cls = <class '_pytest.runner.CallInfo'>, func = <function call_runtest_hook.<locals>.<lambda> at 0x16c5391b0>, when = 'setup', reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)

    @classmethod
    def from_call(
        cls,
        func: "Callable[[], TResult]",
        when: "Literal['collect', 'setup', 'call', 'teardown']",
        reraise: Optional[
            Union[Type[BaseException], Tuple[Type[BaseException], ...]]
        ] = None,
    ) -> "CallInfo[TResult]":
        """Call func, wrapping the result in a CallInfo.
    
        :param func:
            The function to call. Called without arguments.
        :param when:
            The phase in which the function is called.
        :param reraise:
            Exception or exceptions that shall propagate if raised by the
            function, instead of being wrapped in the CallInfo.
        """
        excinfo = None
        start = timing.time()
        precise_start = timing.perf_counter()
        try:
>           result: Optional[TResult] = func()

venv310/lib/python3.10/site-packages/_pytest/runner.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
venv310/lib/python3.10/site-packages/_pytest/runner.py:259: in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
venv310/lib/python3.10/site-packages/pluggy/_hooks.py:265: in __call__
    return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
venv310/lib/python3.10/site-packages/pluggy/_manager.py:80: in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
venv310/lib/python3.10/site-packages/_pytest/unraisableexception.py:83: in pytest_runtest_setup
    yield from unraisable_exception_runtest_hook()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def unraisable_exception_runtest_hook() -> Generator[None, None, None]:
        with catch_unraisable_exception() as cm:
            yield
            if cm.unraisable:
                if cm.unraisable.err_msg is not None:
                    err_msg = cm.unraisable.err_msg
                else:
                    err_msg = "Exception ignored in"
                msg = f"{err_msg}: {cm.unraisable.object!r}\n\n"
                msg += "".join(
                    traceback.format_exception(
                        cm.unraisable.exc_type,
                        cm.unraisable.exc_value,
                        cm.unraisable.exc_traceback,
                    )
                )
>               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <socket.socket fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>
E               
E               Traceback (most recent call last):
E                 File "/Users/pietergijsbers/repositories/gama/venv310/lib/python3.10/site-packages/distributed/node.py", line 169, in start_http_server
E                   if retries_left < 1:
E               ResourceWarning: unclosed <socket.socket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 0)>

venv310/lib/python3.10/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning

I haven’t been able to remedy this (I tried using a fixture so only one cluster is used, but then too it fails). Any suggestions?

I managed to make with work with a shared fixture, but for some scenarios I want to configure the LocalCluster differently. Making a fixture for each test works, but seems bad practice.

@PGijsbers I’m not able to reproduce the ResourceWarning. Could you please share some details about your machine and environment?

I tried functions with sleep() to simulate work:

def test_one():
    with LocalCluster() as cluster:
        sleep(5)


def test_two():
    with LocalCluster() as cluster:
        sleep(5)

I came up with this, which seems to work.

The idea is

  1. acquire file lock. if this blocks, it’ll do so until whatever worker currently executes step 2 is done and then we’ll succeed in acquiring it.
  2. try to read the address file.
    • if we’re the first worker trying to do that, we won’t find it. in that case we start a local cluster and write its address into the file
    • if not, we’ll find the file, and can just read its content into a variable
  3. we’re done with the file: release the file lock
  4. yield the server address
  5. if we’re the worker that started the cluster, shut it down now (__exit__ of the with block at the end)
@pytest.fixture(scope="session")
def local_cluster_addr(
    tmp_path_factory: pytest.TempPathFactory, worker_id: str
) -> Generator[str, None, None]:
    # Adapted from https://pytest-xdist.readthedocs.io/en/latest/how-to.html#making-session-scoped-fixtures-execute-only-once
    import dask.distributed as dd

    def make_cluster() -> dd.LocalCluster:
        return dd.LocalCluster(n_workers=1, threads_per_worker=1)

    if worker_id == "master":
        with make_cluster() as cluster:
            yield cluster.scheduler_address
            return

    # get the temp directory shared by all workers
    root_tmp_dir = tmp_path_factory.getbasetemp().parent

    fn = root_tmp_dir / "dask_scheduler_address.txt"
    lock = FileLock(str(fn) + ".lock")
    lock.acquire()  # can’t use context manager, because we need to release the lock before yielding
    if fn.is_file():
        address = fn.read_text()
        lock.release()
        yield address
        return

    with make_cluster() as cluster:
        fn.write_text(cluster.scheduler_address)
        lock.release()
        yield cluster.scheduler_address