I want to load data in a distributed manner. I want to read the dataset in parallel with each worker loading roughly the same amount of data. I want some pointer mapping which worker is having what data partition (or like pointer to the worker’s local data) and I want to persist the data on the workers. The complete dataset cannot be loaded in one worker’s memory. Hence the data has to be sharded across workers, but I want to have the reference to which worker has what data.
How can I achieve this using Dask?
Hi @vigneshn1997, this seems like a follow-up to your previous post, thanks for bringing it up again! Dask has a number of ways to read and process data in parallel, can you provide more information on what kind of data you’re using and where it is stored? In the meantime, you may find this example is similar to your workflow and there’s also this reference for connecting to remote data.
Hi @scharlottej13 . Thank you for your reply In my case, the data will be stored in a shared network file system (accessible by all nodes a network). To start with, I want to work with tabular data (loaded as dask arrays/dask dataframes) stored in csv/parquet files.
I went through the example shared. For my use case, I want to ensure that the data loaded is compatible to be trained using Tensorflow/PyTorch. And for my requirement, I specifically want to ensure persistence of data on workers and get a future handle of the data for each worker.
To explain the workflow,
Lets say I have 4 dask workers.
I want to load my data, divide the data among the 4 workers (randomly shuffling the data and then dividing it)
Now, I want to iteratively send a task to run on each of the workers local data. So lets say the order of workers visited is 2,1,3,4 (visiting sequentially one after the other) for the first iteration. For the next iteration lets say it is 3,1,4,2 => in this iteration, I don’t want the data to be loaded again (hence I want the data to be persisted across iterations).
And here we can see the need, that when I call client.submit (…, workers=w) : I can use the workers argument to send my task to a particular worker, but how do I pass the future object of the worker’s local data (to make sure that the task runs only on worker w’s data)
Hi @vigneshn1997, thanks for providing more detail! You can control which tasks run on which worker using the workers
argument on client.{submit, persist, compute}
. For example, if you have a task which loads some part of your data, you can capture that future, and use it in other computations. Here’s a small pseudocode example:
def load_data(path, partition):
data = read_some_data_from_network_filesystem(path, partition)
def process_data(data):
return do_things(data)
client = distributed.Client()
workers = client.scheduler_info()['workers'].keys()
worker1_url = workers[0]
worker2_url = workers[1]
path = "/path/on/my/network"
fut1 = client.submit(load_data, path, 1, workers=worker1_url)
fut2 = client.submit(load_data, path, 2, workers=worker2_url)
result1 = client.submit(process_data, fut1, workers=worker1_url)
result2 = client.submit(process_data, fut2, workers=worker2_url)
Thank you @scharlottej13 . I will try this out and get back to you on this
I had one doubt, will the load_data return the data (in the form of dask dataframe?) or does it only load the data and not return anything?
Hi @vigneshn1997! Did this suggested approach work for you?
Hi @scharlottej13. The approach worked for me when I am returning the pandas dataframe directly from the load data function and then passing the future as argument to the process data function. Thank you for the template.
Just one question, will the data be persisted on the worker if I run process data again or will it be re loaded? I couldnt get the difference on a smaller dataset(yet to test it on a larger dataset)
Glad to hear this worked for you! Yes, when using client.submit
, Dask.distributed will keep the results in memory, rather than reloading the data again. You can read more on this here.
1 Like