Hi @scharlottej13, thanks for your response!
I would like to take this opportunity to post a question… Is there any way to make a dask_jobqueue.JobQueueCluster
launch and supervise an arbitrary job file? Essentially, I am wrapping the derived classes to replace the call to dask-worker
with another script, that writes to the disk, and then my main thread can read the file from the disk (pickle). I know this is not the most elegant, but the reason I am doing it like this is because I need my delayed function to be able to use multiprocessing.Pool
. Since this doesn’t seem to be supported at the moment, then I am settling for launching an asynchronous blocking process that writes the output to disk. The main problems is that sbatch/qsub/condor_submit etc … are all non-blocking so I have to write code on top of each class to make the call blocking/asynchronous friendly.
Do you have any ideas on how to optimize this process? What I essentially did was replace the dask.distributed.Client
with a Client of my own, but continue to use the same usage pattern as dask, in hopes that dask will be able to provide a better solution in the future, then I wouldn’t have to refactor much. Any ideas are welcome!
@jreniel Welcome to Discourse! I’ve moved your question to a fresh topic just for better isolation of this discussion.
Thanks!
I actually have a “working” prototype. What I did was replace dask.distributed.Client
with completely custom Client, and reuse some parts of dask_jobqueue
as inputs for my custom Client. The bottom line is that the Client tries to replace the dask_jobqueue.core.Job.submit_command
with a blocking version that can be submitted as an ansynchronous task. It also implements an asyncio.Semaphore
inside to protect against too many open file descriptors. Essentially, the interface looks like this:
from . import my_external_script
async with config.cluster.Cluster(**config.cluster.kwargs) as cluster:
async with Client(config, scheduler=cluster, asynchronous=True, semaphore=config.cluster.semaphore) as client:
outfile = 'test.pkl'
client.gather(
client.enqueue([
my_external_script.__file__,
f'-o={outfile}',
f'--nprocs=12',
],
job_cpu=12 # use multi threading in script
)
# read pickle file and continue
So the part that is not “automated” is how to “return” from the script that you are calling. Essentially, explicit IO has to be done to gather the outputs, rather than the implicit model that dask provides.
I @jreniel ,
I’ve not read the other post yet, but I’ll try to answer this one.
I’m not sure I totally get what you’re trying to do but an important thing is that dask-jobqueue is only a Dask cluster manager (as LocalCluster, dask-kubernetes and plenty others). Dask-jobqueue is not designed to submit arbitrary tasks, and it never will.
For that need, you want to use Dask once it is started through dask-jobqueue or any other.
So is there a reason why you don’t use Dask to launch this blocking task? For the multiprocessing pool thing, you should be able to Popen a process from a task submitted to Dask, would this work for you?
1 Like
Thanks Guillaume,
The workflow that I have has multiple stages, and in the different stages I have needed to use different strategies. For this particular part, I need to submit a function that uses multiprocessing.Pool
, but when launched from a dask-worker, we get an error: daemonic processes are not allowed to have children
. This happens when you try to open a multiprocessing.Pool
inside another one. I think I am happy with the custom Client I wrote and it solves my problem. It will never launch an arbitrary job officially, but it can be coerced to do so quite simply by replacing the Client.
On the other hand, I have other parts of my workflow where I am using the dask.distributed.Client
directly, but I as I try to use the async mode, I always get ValueError: The future belongs to a different loop than the one specified as the loop argument.
I already, naturally, tried to pass loop=asyncio.get_event_loop()
to the context manager calls, but still… It looks like dask doesn’t play well when the user is using async … I guess this is a different issue than the one posted here, but since I already have your attention, I might as well try… My workaround (which I haven’t tested yet) is to wrap synchronous dask in a asyncio.run_in_executor() call. Any advice will also be appreciated!
-J.
I’m awful at async things…
But I wouldn’t use async calls to Dask from the client side. I’m not sure in which case it would be needed. Dask Client and Dask cluster are already in a kind of async mode when using delayed and future API, doesn’t your workflow fit with those? Do you have a simple example?
Hi @guillaumeeb,
asyncio is way too powerful to give up… As I’ve mentioned, my workflow involves several steps, some of which can run concurrently. That is, I’m not just starting one dask Client, I’m actually starting multiple dask clients, each of them working on a different aspect of the workflow. In the end, once these dask-workers have prepared the data, then they go into a single threaded final call that uses all of these dask-created components … If you are curious about what I am doing, I am an oceanographer/numerical modeler and I am creating a finite element mesh from geodata… Parallelization is used for the pre-processing part of the mesh generator, the mesh generator itself doesn’t use dask (or parallelization). But the pre-processing is actually expensive, so that’s how dask came into the picture …
To be honest, async is very simple to use, and waaay too powerful to give up. If I could at least solve the issue it would be nice … Note that the workflow for async is almost identical to what I have above, except that with the dask Client instead of my custom client … Anyway, I think I can work with launching an executor, I’ll be testing that this week … Thanks for your advice and support, I really appreciate it and it has definitely cleared up some of my questions!
-J.