Is there a way to specifically process some dask delayed jobs with threads vs processes?
@dask.delayed def plot(): ... # matplotlib job that needs processes because matplotlib is not thread safe @dask.delayed def image_manip(): ... # imageio job that only needs threads because it's I/O bound
Would this work?
with dask.annotate(scheduler="threads"): plot() with dask.annotate(scheduler="processes): image_manip()
Here’s a minimal example, but it doesn’t seem to do what I want:
import dask import logging # set up logging to console console = logging.StreamHandler() console.setLevel(logging.DEBUG) # set a format which is simpler for console use formatter = logging.Formatter('%(process)s %(threadName)s %(name)s %(message)s') console.setFormatter(formatter) # add the handler to the root logger logging.getLogger('').addHandler(console) logger = logging.getLogger(__name__) @dask.delayed def test(x): logger.warning(x) return x with dask.annotate(scheduler="threads"): a = test("a") b = test("b") with dask.annotate(scheduler="processes"): c = test("c") jobs = [a, b, c] dask.compute(jobs)
c to show ProcessPoolExecutor, but get all threadpools using the same process
9287 ThreadPoolExecutor-0_0 __main__ a 9287 ThreadPoolExecutor-0_1 __main__ c 9287 ThreadPoolExecutor-0_2 __main__ b (['a', 'b', 'c'],)