Hi!
I’m trying to implement a rather complicated workflow that involves remote-controlling a MATLAB session (a non-serializable object). With multiple tasks acting on that session and using more than one worker, an error can be observed.
task_batch1 = ['command_a', 'command_b','command_c']
task_batch2 = ['command_d', 'command_e', 'command_f']
session = delayed(session_start)()
delayed_batch1 = [delayed(session_execute)(session, task) for task in task_batch1]
delayed_batch2 = [bind(delayed(session_execute), delayed_batch1)(session, task) for task in task_batch2]
close = bind(delayed(session_close), delayed_batch2)(session)
client.compute(close)
In the example above, I’m (i) creating a session, (ii) executing a first batch of tasks on it, (iii) executing a second batch after the first batch finishes, and (iv) closing the session after the second batch finishes. However, dask understandably attempts to distribute the first batch amongst workers (which is not possible since the session cannot be serialized and shared). Is there a way to make explicit where work is supposed to happen? E.g. fuse delayed_batch1
into a single task or make explicit that session
and all of its dependents should happen on the same worker?
It seems to be possible with futures, but I need the dask.graph_manipulation.bind
utility that seems to be only available with delayed
. I also found dask.optimization.fuse
along with this example on SO, but could not see the connection to my use case.
Your help is very much appreciated, thanks a lot in advance!