Hi!
I have a problem using @ian’s code (task-group-statistics-plugin), maybe you can help
My plugin now looks as following
class TaskCounter(SchedulerPlugin):
def __init__(self, task_names: List):
self.task_names = task_names
self.groups = {}
self.scheduler = None
async def get_task_stats(self, comm):
df = pd.DataFrame.from_dict(self.groups, orient="index")
return df
def start(self, scheduler):
"""Called on scheduler start as well as on registration time"""
self.scheduler = scheduler
scheduler.handlers["get_task_stats"] = self.get_task_stats
def transition(self, key: str, start, finish, *args, **kwargs):
# Uncomment this in your workflow for getting all the tasks
# logging.info(f"Key: {key}")
prefix_name = key_split(key)
group_name = key_split_group(key)
if prefix_name in self.task_names:
if start == 'processing' and finish == 'memory':
# Add the stats
if group_name not in self.groups:
self.groups[group_name] = {}
group = self.scheduler.task_groups[group_name]
self.groups[group_name]["prefix"] = prefix_name
self.groups[group_name]["duration"] = group.duration
self.groups[group_name]["start"] = str(datetime.fromtimestamp(group.start))
self.groups[group_name]["stop"] = str(datetime.fromtimestamp(group.stop))
self.groups[group_name]["nbytes"] = group.nbytes_total
def restart(self, scheduler):
self.groups = {}
Mostly like Ian’s one. The problem is that, weh I want to recover the stats (after compute() has finished, but before client.close() is called), I run
stats_data = client.sync(client.scheduler.get_task_stats)
And I get the following error stack
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f7a19154a30>>, <Task finished name='Task-47' coro=<Server.handle_comm() done, defined at /opt/venv/lib/python3.8/site-packages/distributed/core.py:433> exception=ValueError('The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().')>)
Traceback (most recent call last):
File "/opt/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
ret = callback()
File "/opt/venv/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
future.result()
File "/opt/venv/lib/python3.8/site-packages/distributed/core.py", line 537, in handle_comm
if reply and result != Status.dont_reply:
File "/opt/venv/lib/python3.8/site-packages/pandas/core/generic.py", line 1535, in __nonzero__
raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
Traceback (most recent call last):
File "/usr/local/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/local/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/opt/project/dl_oneo_gstp_crops/crops_yearly.py", line 188, in <module>
main()
File "/opt/project/dl_oneo_gstp_crops/crops_yearly.py", line 154, in main
stats_data = client.sync(client.scheduler.get_task_stats)
File "/opt/venv/lib/python3.8/site-packages/distributed/utils.py", line 309, in sync
return sync(
File "/opt/venv/lib/python3.8/site-packages/distributed/utils.py", line 376, in sync
raise exc.with_traceback(tb)
File "/opt/venv/lib/python3.8/site-packages/distributed/utils.py", line 349, in f
result = yield future
File "/opt/venv/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/opt/venv/lib/python3.8/site-packages/distributed/core.py", line 905, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/opt/venv/lib/python3.8/site-packages/distributed/core.py", line 674, in send_recv
response = await comm.read(deserializers=deserializers)
File "/opt/venv/lib/python3.8/site-packages/distributed/comm/inproc.py", line 199, in read
raise CommClosedError()
distributed.comm.core.CommClosedError
I made a workaround overloading the remove_client and the restart methods for showing the stats in that moment (as in my first code), but that solution is much less “polite”, as I cannot poll for the stats exactly when I want
Thank you!