Is it possible to use mulitple clients concurrently?

Hi all,
I am new to the group. I was wondering if two or more dask clients can be used concurrently. Below I have an example code where a cluster is utilized 10 times to call ‘doTheJob’ 1000 times. I am wondering if the code can be further parallelized by starting another set of cluster/client that handles the for loop for x. I would appreciate any support.
Thanks.
Bilgin

import pandas as pd
import dask
from dask.distributed import Client

def doTheJob(i):
return pd.DataFrame(
{‘i’:i,
‘i^2’:i*i
})

for x in range(10):
with Client(cluster) as client:
jobs = [dask.delayed(doTheJob)(i) for i in range(1000)]
data = dask.dataframe.from_delayed(jobs).compute()

==========================================================
I have tried the following or versions of it unsuccessfully for the problem stated above.

def useClient():
with Client(cluster) as client:
jobs = [dask.delayed(doTheJob)(i) for i in range(1000)]
data = dask.dataframe.from_delayed(jobs).compute()
return data

with Client(cluster1) as client1:
jobs1 = [dask.delayed(useClient)() for x in range(10)]
data = dask.dataframe.from_delayed(jobs1).compute()

Hi @baltun and welcome to Discourse!

It is possible to run computations on two clusters simultaneously in Dask, however, it’d be great to know a little bit more about your setup as this is not usually necessary for improving efficiency.

If you do want to use two cluster instances, calling dask.dataframe.from_delayed(jobs1).compute() will use the most recently created client; distributed.get_client will find the correct one (see this stack overflow answer). Also note that compute is a blocking operation (see the docs on managing computation), which may change when you want to use it.

The other options I’d recommend are:

  1. run your computation in batches (see these best practices on avoiding too many tasks).

  2. if the data can fit into memory on the client, could be to remove the delayed step and use dask.dataframe directly:

import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd

def doTheJob(n):
    return dd.from_pandas(
        pd.DataFrame(
            {'i':[i for i in range(n)],
             'i^2': [i*i for i in range(n)]}
        ), npartitions=2)

cluster = LocalCluster()

with Client(cluster) as client:
    jobs = [doTheJob(1000) for x in range(10)]
    # returns a tuple of 10 Dask DataFrames
    data = dask.compute(*jobs)
  1. Compute on many computations at once, per these best practices (this snippet may not be the best way necessarily, that will depend on exactly what you’re doing)
import dask.dataframe as dd
import pandas as pd
import dask
from dask.distributed import Client, LocalCluster

def doTheJob(i):
    return pd.DataFrame({'i':[i], 'i^2': [i*i]})

cluster = LocalCluster()
client = Client(cluster)

jobs = [[dask.delayed(doTheJob)(i) for i in range(1000)] for x in range(10)]
data = client.map(dd.from_delayed, jobs)
results = client.gather(data)
1 Like