Using Pytest with LocalClusters

Hi,

I have a set of unit tests which internally create and tear down local clusters:

def test_one():
  with LocalCluster() as cluster:
    # do something

def test_two():
  with LocalCluster() as cluster:
    # do something

when running these tests sequentially, all tests pass. When running them in parallel (with -n 4 using pytest-xdist), I get the following unclosed resource (socket) error:

[gw0] darwin -- Python 3.10.5 /Users/pietergijsbers/repositories/gama/venv310/bin/python

cls = <class '_pytest.runner.CallInfo'>, func = <function call_runtest_hook.<locals>.<lambda> at 0x16c5391b0>, when = 'setup', reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)

    @classmethod
    def from_call(
        cls,
        func: "Callable[[], TResult]",
        when: "Literal['collect', 'setup', 'call', 'teardown']",
        reraise: Optional[
            Union[Type[BaseException], Tuple[Type[BaseException], ...]]
        ] = None,
    ) -> "CallInfo[TResult]":
        """Call func, wrapping the result in a CallInfo.
    
        :param func:
            The function to call. Called without arguments.
        :param when:
            The phase in which the function is called.
        :param reraise:
            Exception or exceptions that shall propagate if raised by the
            function, instead of being wrapped in the CallInfo.
        """
        excinfo = None
        start = timing.time()
        precise_start = timing.perf_counter()
        try:
>           result: Optional[TResult] = func()

venv310/lib/python3.10/site-packages/_pytest/runner.py:338: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
venv310/lib/python3.10/site-packages/_pytest/runner.py:259: in <lambda>
    lambda: ihook(item=item, **kwds), when=when, reraise=reraise
venv310/lib/python3.10/site-packages/pluggy/_hooks.py:265: in __call__
    return self._hookexec(self.name, self.get_hookimpls(), kwargs, firstresult)
venv310/lib/python3.10/site-packages/pluggy/_manager.py:80: in _hookexec
    return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
venv310/lib/python3.10/site-packages/_pytest/unraisableexception.py:83: in pytest_runtest_setup
    yield from unraisable_exception_runtest_hook()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    def unraisable_exception_runtest_hook() -> Generator[None, None, None]:
        with catch_unraisable_exception() as cm:
            yield
            if cm.unraisable:
                if cm.unraisable.err_msg is not None:
                    err_msg = cm.unraisable.err_msg
                else:
                    err_msg = "Exception ignored in"
                msg = f"{err_msg}: {cm.unraisable.object!r}\n\n"
                msg += "".join(
                    traceback.format_exception(
                        cm.unraisable.exc_type,
                        cm.unraisable.exc_value,
                        cm.unraisable.exc_traceback,
                    )
                )
>               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <socket.socket fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6>
E               
E               Traceback (most recent call last):
E                 File "/Users/pietergijsbers/repositories/gama/venv310/lib/python3.10/site-packages/distributed/node.py", line 169, in start_http_server
E                   if retries_left < 1:
E               ResourceWarning: unclosed <socket.socket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 0)>

venv310/lib/python3.10/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning

I haven’t been able to remedy this (I tried using a fixture so only one cluster is used, but then too it fails). Any suggestions?

I managed to make with work with a shared fixture, but for some scenarios I want to configure the LocalCluster differently. Making a fixture for each test works, but seems bad practice.

@PGijsbers I’m not able to reproduce the ResourceWarning. Could you please share some details about your machine and environment?

I tried functions with sleep() to simulate work:

def test_one():
    with LocalCluster() as cluster:
        sleep(5)


def test_two():
    with LocalCluster() as cluster:
        sleep(5)