Hi all,
I am new to the group. I was wondering if two or more dask clients can be used concurrently. Below I have an example code where a cluster is utilized 10 times to call ‘doTheJob’ 1000 times. I am wondering if the code can be further parallelized by starting another set of cluster/client that handles the for loop for x. I would appreciate any support.
Thanks.
Bilgin
import pandas as pd
import dask
from dask.distributed import Client
for x in range(10):
with Client(cluster) as client:
jobs = [dask.delayed(doTheJob)(i) for i in range(1000)]
data = dask.dataframe.from_delayed(jobs).compute()
==========================================================
I have tried the following or versions of it unsuccessfully for the problem stated above.
def useClient():
with Client(cluster) as client:
jobs = [dask.delayed(doTheJob)(i) for i in range(1000)]
data = dask.dataframe.from_delayed(jobs).compute()
return data
with Client(cluster1) as client1:
jobs1 = [dask.delayed(useClient)() for x in range(10)]
data = dask.dataframe.from_delayed(jobs1).compute()
It is possible to run computations on two clusters simultaneously in Dask, however, it’d be great to know a little bit more about your setup as this is not usually necessary for improving efficiency.
If you do want to use two cluster instances, calling dask.dataframe.from_delayed(jobs1).compute() will use the most recently created client; distributed.get_client will find the correct one (see this stack overflow answer). Also note that compute is a blocking operation (see the docs on managing computation), which may change when you want to use it.
if the data can fit into memory on the client, could be to remove the delayed step and use dask.dataframe directly:
import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd
def doTheJob(n):
return dd.from_pandas(
pd.DataFrame(
{'i':[i for i in range(n)],
'i^2': [i*i for i in range(n)]}
), npartitions=2)
cluster = LocalCluster()
with Client(cluster) as client:
jobs = [doTheJob(1000) for x in range(10)]
# returns a tuple of 10 Dask DataFrames
data = dask.compute(*jobs)
Compute on many computations at once, per these best practices (this snippet may not be the best way necessarily, that will depend on exactly what you’re doing)
import dask.dataframe as dd
import pandas as pd
import dask
from dask.distributed import Client, LocalCluster
def doTheJob(i):
return pd.DataFrame({'i':[i], 'i^2': [i*i]})
cluster = LocalCluster()
client = Client(cluster)
jobs = [[dask.delayed(doTheJob)(i) for i in range(1000)] for x in range(10)]
data = client.map(dd.from_delayed, jobs)
results = client.gather(data)