def f(n):
print(n)
return n
def g(a, b):
print(a, b)
return a + b
x = dask.delayed(f)(1)
y = dask.delayed(f)(2)
z = dask.delayed(g)(x, y)
if __name__ == '__main__':
client = Client('xxx:8786')
f = client.compute(z, workers={z: 'worker206'}) # not working
#f = client.compute(z, workers=['worker206']) # can working
print(f.result())
Error message:
root@localhost:/data/demo# python3 setworker.py
/usr/local/lib/python3.10/dist-packages/distributed/client.py:1348: VersionMismatchWarning: Mismatched versions found
+---------+--------+-----------+------------------+
| Package | Client | Scheduler | Workers |
+---------+--------+-----------+------------------+
| numpy | None | 1.23.4 | {'1.23.4', None} |
+---------+--------+-----------+------------------+
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
2022-11-23 00:48:37,546 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/usr/local/lib/python3.10/dist-packages/msgpack/__init__.py", line 38, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
2022-11-23 00:48:37,547 - distributed.comm.utils - INFO - Unserializable Message: [{'op': 'update-graph-hlg', 'hlg': {'layers': [{'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {'f-00361204-9fe7-4b25-8311-9c49bd44bc61': {'function': b'\x80\x04\x95\xce\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x00K\x01K\x02KCC\x0ct\x00|\x00\x83\x01\x01\x00|\x00S\x00\x94N\x85\x94\x8c\x05print\x94\x85\x94\x8c\x01n\x94\x85\x94\x8c\x17/data/demo/setworker.py\x94\x8c\x01f\x94K\x08C\x04\x08\x01\x04\x01\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94h\x0euNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h\x19}\x94}\x94(h\x15h\x0f\x8c\x0c__qualname__\x94h\x0f\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x16\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.', 'args': b'\x80\x04\x95\x05\x00\x00\x00\x00\x00\x00\x00K\x02\x85\x94.'}}, 'dependencies': {'f-00361204-9fe7-4b25-8311-9c49bd44bc61': set()}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}, {'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {'f-6107563f-b58c-4102-b394-a691eb639d0f': {'function': b'\x80\x04\x95\xce\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x00K\x01K\x02KCC\x0ct\x00|\x00\x83\x01\x01\x00|\x00S\x00\x94N\x85\x94\x8c\x05print\x94\x85\x94\x8c\x01n\x94\x85\x94\x8c\x17/data/demo/setworker.py\x94\x8c\x01f\x94K\x08C\x04\x08\x01\x04\x01\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94h\x0euNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h\x19}\x94}\x94(h\x15h\x0f\x8c\x0c__qualname__\x94h\x0f\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x16\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.', 'args': b'\x80\x04\x95\x05\x00\x00\x00\x00\x00\x00\x00K\x01\x85\x94.'}}, 'dependencies': {'f-6107563f-b58c-4102-b394-a691eb639d0f': set()}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}, {'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {'g-35948fef-4e67-4473-a693-e8201fabf31b': {'function': b'\x80\x04\x95\xd8\x01\x00\x00\x00\x00\x00\x00\x8c\x17cloudpickle.cloudpickle\x94\x8c\x0e_make_function\x94\x93\x94(h\x00\x8c\r_builtin_type\x94\x93\x94\x8c\x08CodeType\x94\x85\x94R\x94(K\x02K\x00K\x00K\x02K\x03KCC\x12t\x00|\x00|\x01\x83\x02\x01\x00|\x00|\x01\x17\x00S\x00\x94N\x85\x94\x8c\x05print\x94\x85\x94\x8c\x01a\x94\x8c\x01b\x94\x86\x94\x8c\x17/data/demo/setworker.py\x94\x8c\x01g\x94K\x0cC\x04\n\x01\x08\x01\x94))t\x94R\x94}\x94(\x8c\x0b__package__\x94N\x8c\x08__name__\x94\x8c\x08__main__\x94\x8c\x08__file__\x94h\x0fuNNNt\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94h\x1a}\x94}\x94(h\x16h\x10\x8c\x0c__qualname__\x94h\x10\x8c\x0f__annotations__\x94}\x94\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94h\x17\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94N\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94u\x86\x94\x86R0.', 'args': b'\x80\x04\x95U\x00\x00\x00\x00\x00\x00\x00\x8c&f-6107563f-b58c-4102-b394-a691eb639d0f\x94\x8c&f-00361204-9fe7-4b25-8311-9c49bd44bc61\x94\x86\x94.'}}, 'dependencies': {'g-35948fef-4e67-4473-a693-e8201fabf31b': {'f-6107563f-b58c-4102-b394-a691eb639d0f', 'f-00361204-9fe7-4b25-8311-9c49bd44bc61'}}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}, {'__module__': 'dask.highlevelgraph', '__name__': 'MaterializedLayer', 'state': {'dsk': {}, 'dependencies': {}}, 'annotations': {'workers': [{Delayed('g-35948fef-4e67-4473-a693-e8201fabf31b'): 'worker206'}]}}]}, 'keys': ['g-35948fef-4e67-4473-a693-e8201fabf31b'], 'priority': None, 'submitting_task': None, 'fifo_timeout': '60s', 'actors': None, 'code': "import dask, time\nimport networkx as nx\nfrom importlib import import_module\nfrom dask.distributed import Client\nfrom dask.utils import apply\n#from tasks import add\n\ndef f(n):\n print(n)\n return n\n\ndef g(a, b):\n print(a, b)\n return a + b\n\nx = dask.delayed(f)(1)\ny = dask.delayed(f)(2)\nz = dask.delayed(g)(x, y)\n\nif __name__ == '__main__':\n client = Client('148.153.45.202:8786')\n f = client.compute(z, workers={z: 'worker206'})\n #f = client.compute(z, workers=['worker206'])\n print(f.result())\n"}]
2022-11-23 00:48:37,547 - distributed.comm.utils - ERROR - can not serialize 'Delayed' object
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/utils.py", line 55, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/usr/local/lib/python3.10/dist-packages/msgpack/__init__.py", line 38, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
2022-11-23 00:48:37,547 - distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/distributed/batched.py", line 115, in _background_send
nbytes = yield coro
File "/usr/local/lib/python3.10/dist-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/tcp.py", line 271, in write
frames = await to_frames(
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/utils.py", line 72, in to_frames
return _to_frames()
File "/usr/local/lib/python3.10/dist-packages/distributed/comm/utils.py", line 55, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/usr/local/lib/python3.10/dist-packages/distributed/protocol/core.py", line 109, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/usr/local/lib/python3.10/dist-packages/msgpack/__init__.py", line 38, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 229, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 291, in msgpack._cmsgpack.Packer._pack
TypeError: can not serialize 'Delayed' object
Traceback (most recent call last):
File "/data/demo/setworker.py", line 24, in <module>
print(f.result())
File "/usr/local/lib/python3.10/dist-packages/distributed/client.py", line 281, in result
raise result
concurrent.futures._base.CancelledError: g-35948fef-4e67-4473-a693-e8201fabf31b
Am I using it the right way?