While running embarrassingly parallel computations with dask.compute(list_of_delayed_objects)
, on a Google Cloud Deployment using a dask-gateway-cluster, and distributed client, I often run into 2023-06-21 15:55:59,925 - distributed.client - WARNING - Couldn't gather 20042 keys, rescheduling {'mean_field_lsq-5917ffc7-0517-4d76-b64f-06fcdda84591':()
… messages. It seems to happen during the last ~1% of the computation,
followed by the dashboard-progress-bar then reversing, and it looks like it reverses the same number of tasks as the number of keys in the warning.
Sometimes also followed by a CancelledError
like in the example below. That example consisted of 10 000 tasks, and the rescheduling started when there were 10 tasks left.
Example 1
Cores per worker: 3. Memory: 6GB
%%time
results = dask.compute(get_delayed_objects( 0, window_size, Nblocks ),
get_delayed_objects( 1, window_size, Nblocks ),
get_delayed_objects( 2, window_size, Nblocks ),
get_delayed_objects( 3, window_size, Nblocks ),
get_delayed_objects( 4, window_size, Nblocks ),
get_delayed_objects( 5, window_size, Nblocks ),
get_delayed_objects( 6, window_size, Nblocks ),
get_delayed_objects( 7, window_size, Nblocks ),
get_delayed_objects( 8, window_size, Nblocks ),
get_delayed_objects( 9, window_size, Nblocks )
)
2023-08-11 12:08:55,405 - distributed.client - WARNING - Couldn't gather 15 keys, rescheduling {'optimize-708294f0-14a4-4ce8-9c28-bfbf6469c128': (), 'optimize-84e96850-0446-4d0b-a67a-53cadc0c3d01': (), 'optimize-7a0c0a45-1519-495f-a767-8413af0a6a60': (), 'optimize-fadba322-23f6-456c-a57d-dc9ee6d1293e': (), 'optimize-3835d312-033d-48d1-8e3d-42ea0ed87358': (), 'optimize-2c76a50f-4e6e-4cc5-8279-b4babf498228': (), 'optimize-3c743ba9-80d4-4438-a924-214a08520da0': (), 'optimize-ba67a080-fdd8-426d-9464-7120d2f92e27': (), 'optimize-515c7969-7c02-45f2-bbd1-934fd3c8c44e': (), 'optimize-77b98835-cdc4-419d-af5d-aa00943b6625': (), 'optimize-772d6fee-c66b-499c-928e-f66aa0de139e': (), 'optimize-87943e23-9fe9-4abc-96d3-5816216aafc8': (), 'optimize-392a3b7a-3d4d-41cf-a7db-d600cf772f6d': (), 'optimize-b0602ae0-8ca2-417d-b4c6-dcab5a3365d6': (), 'optimize-a9c2e4c7-65b2-4269-beee-08ece7f83ca8': ()}
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File <timed exec>:2
File /srv/conda/envs/notebook/lib/python3.10/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
597 keys.append(x.__dask_keys__())
598 postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:3122, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
3120 should_rejoin = False
3121 try:
-> 3122 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
3123 finally:
3124 for f in futures.values():
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2291, in Client.gather(self, futures, errors, direct, asynchronous)
2289 else:
2290 local_worker = None
-> 2291 return self.sync(
2292 self._gather,
2293 futures,
2294 errors=errors,
2295 direct=direct,
2296 local_worker=local_worker,
2297 asynchronous=asynchronous,
2298 )
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:339, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
337 return future
338 else:
--> 339 return sync(
340 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
341 )
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:406, in sync(loop, func, callback_timeout, *args, **kwargs)
404 if error:
405 typ, exc, tb = error
--> 406 raise exc.with_traceback(tb)
407 else:
408 return result
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/utils.py:379, in sync.<locals>.f()
377 future = asyncio.wait_for(future, callback_timeout)
378 future = asyncio.ensure_future(future)
--> 379 result = yield future
380 except Exception:
381 error = sys.exc_info()
File /srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/gen.py:769, in Runner.run(self)
766 exc_info = None
768 try:
--> 769 value = future.result()
770 except Exception:
771 exc_info = sys.exc_info()
File /srv/conda/envs/notebook/lib/python3.10/site-packages/distributed/client.py:2155, in Client._gather(self, futures, errors, direct, local_worker)
2153 else:
2154 raise exception.with_traceback(traceback)
-> 2155 raise exc
2156 if errors == "skip":
2157 bad_keys.add(key)
CancelledError: optimize-ebb00945-ca13-4947-9365-2e6aa2bb0f3b
- What types of tasks gets rescheduled by the client? I mean, what does this warning imply about my task that I am missing?
I dont understand why it presumably finishes a task, then reschedules, computing a task that has already been done.
I read that increasing memory could solve a similar error. (How do I solve "distributed.scheduler - ERROR - Couldn't gather keys"? - #2 by pavithraes). Doubling the memory per worker in Example 1 solves the CancelledError
, but rescheduling tasks still occur. This is shown in Example 2.
Example 2
Cores per worker: 3. Memory: 12GB
%%time
results = dask.compute(get_delayed_objects( 0, window_size, Nblocks ),
get_delayed_objects( 1, window_size, Nblocks ),
get_delayed_objects( 2, window_size, Nblocks ),
get_delayed_objects( 3, window_size, Nblocks ),
get_delayed_objects( 4, window_size, Nblocks ),
get_delayed_objects( 5, window_size, Nblocks ),
get_delayed_objects( 6, window_size, Nblocks ),
get_delayed_objects( 7, window_size, Nblocks ),
get_delayed_objects( 8, window_size, Nblocks ),
get_delayed_objects( 9, window_size, Nblocks )
)
2023-08-11 13:33:14,811 - distributed.client - WARNING - Couldn't gather 13 keys, rescheduling {'optimize-515a7b36-a66e-4770-89a5-2a88913b9067': (), 'optimize-dc2ddfa7-0052-42d4-b7cd-425f99c9561b': (), 'optimize-568a94ec-08a3-4c2c-a302-91b0023248cd': (), 'optimize-30801cfc-0ca7-47f2-b90f-9448490cb994': (), 'optimize-afd5d0b6-6015-4e33-a0c0-8ee3464d60cb': (), 'optimize-81fca5e5-a6fe-4bf8-9d4f-fbaf3a787503': (), 'optimize-526c6040-3462-4acc-8137-84777ef2fdd1': (), 'optimize-f410368d-150f-49fc-9699-3e6994d62533': (), 'optimize-0cedfd8c-0130-406e-ba25-bdb9fc0a215e': (), 'optimize-368b5de0-d638-4100-8f57-92575c9de04d': (), 'optimize-ed1d33e4-6b5e-497e-97e4-32d716c9029c': (), 'optimize-014e3a40-0cc9-4410-bba6-38843682253b': (), 'optimize-b07cbdbb-02ae-4e6d-9cca-af6c468de09e': ()} 2023-08-11 13:39:25,751 - distributed.client - WARNING - Couldn't gather 10 keys, rescheduling {'optimize-b6cb06fa-a7a6-4230-bab3-4357d031e543': (), 'optimize-dc9bb730-10f1-4eea-bb18-4b28a111256c': (), 'optimize-11f4df18-ac0e-493b-b923-37964cc84ef5': (), 'optimize-467c36b3-3fa3-4c2d-a3db-f62fa453eba8': (), 'optimize-6f392b12-f51a-4bb1-8a0b-e0ef89d1e68a': (), 'optimize-da48e050-8fe8-4e90-a75e-4b1be757e955': (), 'optimize-aa18fa0f-c1e2-40ec-955b-7f53496bf97c': (), 'optimize-1c573fc7-baa5-48ee-98c9-f591a78076bc': (), 'optimize-2e60a310-ea07-4c96-8361-74bf4a206c64': (), 'optimize-348dff59-fbcb-451c-af7c-3c67715ba707': ()}
CPU times: user 1min 3s, sys: 3.06 s, total: 1min 6s
Wall time: 1h 20min 47s
The total process took 1 hr 20 min, while the 10 last tasks took 26 minutes after alot of rescheduling. So increasing memory seems to help with the CancelledError
but not the rescheduling.
- From the Worker Memory Use table in the dashboard, it looks like workers does not need the 12GB I give them, so why does increasing
worker_memory
help?
- What can I do to have the computation finish without spending time on rescheduling of tasks? I could make an attempt to make reproducible examples, but looks like similar errors have been reproduced already (below), so I am of course hoping there exist a simple fix!