Client.compute(z, workers={z: '127.0.0.1'}) not working?

def f(n):
    print(n)
    return n

def g(a, b):
    print(a, b)
    return a + b

x = dask.delayed(f)(1)
y = dask.delayed(f)(2)
z = dask.delayed(g)(x, y)

if __name__ == '__main__':
    client = Client('xxx:8786')
    f = client.compute(z, workers={z: 'worker206'})  # not working
    #f = client.compute(z, workers=['worker206'])  # can working
    print(f.result())

Error message:

root@localhost:/data/demo# python3 setworker.py 
/usr/local/lib/python3.10/dist-packages/distributed/client.py:1348: VersionMismatchWarning: Mismatched versions found

+---------+--------+-----------+------------------+
| Package | Client | Scheduler | Workers          |
+---------+--------+-----------+------------------+
| numpy   | None   | 1.23.4    | {'1.23.4', None} |
+---------+--------+-----------+------------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
2022-11-23 00:48:37,546 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/usr/local/lib/python3.10/dist-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
2022-11-23 00:48:37,547 - distributed.comm.utils - INFO - Unserializable Message: [{'op': 'update-graph-hlg', 'hlg': {'layers': [{'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {'f-00361204-9fe7-4b25-8311-9c49bd44bc61': {'function': b'\x80\x04\x95\xce\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x00K\x01K\x02KCC\x0ct\x00|\x00\x83\x01\x01\x00|\x00S\x00\x94N\x85\x94\x8c\x05print\x94\x85\x94\x8c\x01n\x94\x85\x94\x8c\x17/data/demo/setworker.py\x94\x8c\x01f\x94K\x08C\x04\x08\x01\x04\x01\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94h\x0euNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h\x19}\x94}\x94(h\x15h\x0f\x8c\x0c__qualname__\x94h\x0f\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x16\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.', 'args': b'\x80\x04\x95\x05\x00\x00\x00\x00\x00\x00\x00K\x02\x85\x94.'}}, 'dependencies': {'f-00361204-9fe7-4b25-8311-9c49bd44bc61': set()}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}, {'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {'f-6107563f-b58c-4102-b394-a691eb639d0f': {'function': b'\x80\x04\x95\xce\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x00K\x01K\x02KCC\x0ct\x00|\x00\x83\x01\x01\x00|\x00S\x00\x94N\x85\x94\x8c\x05print\x94\x85\x94\x8c\x01n\x94\x85\x94\x8c\x17/data/demo/setworker.py\x94\x8c\x01f\x94K\x08C\x04\x08\x01\x04\x01\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94h\x0euNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h\x19}\x94}\x94(h\x15h\x0f\x8c\x0c__qualname__\x94h\x0f\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x16\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.', 'args': b'\x80\x04\x95\x05\x00\x00\x00\x00\x00\x00\x00K\x01\x85\x94.'}}, 'dependencies': {'f-6107563f-b58c-4102-b394-a691eb639d0f': set()}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}, {'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {'g-35948fef-4e67-4473-a693-e8201fabf31b': {'function': b'\x80\x04\x95\xd8\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x02K\x00K\x00K\x02K\x03KCC\x12t\x00|\x00|\x01\x83\x02\x01\x00|\x00|\x01\x17\x00S\x00\x94N\x85\x94\x8c\x05print\x94\x85\x94\x8c\x01a\x94\x8c\x01b\x94\x86\x94\x8c\x17/data/demo/setworker.py\x94\x8c\x01g\x94K\x0cC\x04\n\x01\x08\x01\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94h\x0fuNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h\x1a}\x94}\x94(h\x16h\x10\x8c\x0c__qualname__\x94h\x10\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x17\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.', 'args': b'\x80\x04\x95U\x00\x00\x00\x00\x00\x00\x00\x8c&f-6107563f-b58c-4102-b394-a691eb639d0f\x94\x8c&f-00361204-9fe7-4b25-8311-9c49bd44bc61\x94\x86\x94.'}}, 'dependencies': {'g-35948fef-4e67-4473-a693-e8201fabf31b': {'f-6107563f-b58c-4102-b394-a691eb639d0f', 'f-00361204-9fe7-4b25-8311-9c49bd44bc61'}}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}, {'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {}, 'dependencies': {}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}]}, 'keys': ['g-35948fef-4e67-4473-a693-e8201fabf31b'], 'priority': None, 'submitting_task': None, 'fifo_timeout': '60s', 'actors': None, 'code': "import dask, time\nimport networkx as nx\nfrom importlib import import_module\nfrom dask.distributed import Client\nfrom dask.utils import apply\n#from tasks import add\n\ndef f(n):\n    print(n)\n    return n\n\ndef g(a, b):\n    print(a, b)\n    return a + b\n\nx = dask.delayed(f)(1)\ny = dask.delayed(f)(2)\nz = dask.delayed(g)(x, y)\n\nif __name__ == '__main__':\n    client = Client('148.153.45.202:8786')\n    f = client.compute(z, workers={z: 'worker206'})\n    #f = client.compute(z, workers=['worker206'])\n    print(f.result())\n"}]
2022-11-23 00:48:37,547 - distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/comm/utils.py", line 55, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/usr/local/lib/python3.10/dist-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
2022-11-23 00:48:37,547 - distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/distributed/batched.py", line 115, in _background_send
    nbytes = yield coro
  File "/usr/local/lib/python3.10/dist-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 271, in write
    frames = await to_frames(
  File "/usr/local/lib/python3.10/dist-packages/distributed/comm/utils.py", line 72, in to_frames
    return _to_frames()
  File "/usr/local/lib/python3.10/dist-packages/distributed/comm/utils.py", line 55, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/usr/local/lib/python3.10/dist-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
Traceback (most recent call last):
  File "/data/demo/setworker.py", line 24, in <module>
    print(f.result())
  File "/usr/local/lib/python3.10/dist-packages/distributed/client.py", line 281, in result
    raise result
concurrent.futures._base.CancelledError: g-35948fef-4e67-4473-a693-e8201fabf31b

Am I using it the right way?

If you exclusively want to constrain z, but not x or y:

x = dask.delayed(f)(1)
y = dask.delayed(f)(2)
with dask.annotate(workers=['worker206']):
    z = dask.delayed(g)(x, y)

client = Client('xxx:8786')
f = client.compute(z)

[/quote]