Issue in Parallel row preprocessing with Dask

Hi,

Sorry I got busy with a few other things and could just get back on Dask saving dataframe partitions as files. So let me explain my flow. I am still getting some issues if you can help me with it. I opened a new discussion, since I thought the whole flow would benefit from a new discussion and also the issue is a new type of issue different from the above post (if this is not ok, I can move the query as a reply to the above question as well). I don’t want to publish the dataset because I don’t want the entire data to be visible to all dask workers. I only want a worker’s local data to be accessible to it.

image

Flow for the above given image is explained below.

  1. I have a csv file with lets say image paths (images present in some remote storage)
  2. I want to shuffle the data and shard the data almost equally among dask workers. For this I am doing this: (Dask shuffling between partitions)
df = pd.read_csv(csv_file_path)
_df = dd.from_pandas(pandas_df, npartitions=num_workers)
shuffled_df = _df.sample(frac=1)
sharded_df = shuffled_df.repartition(npartitions=num_workers)
  1. Now each worker has to work on its own data shard, do some row wise processing on its data and save the processed rows on its local persistent storage. (Map_partitions question for image processing)
transformed_data = sharded_df.map_partitions(
            lambda part: part.apply(process_row, args=(data_info, params, row_routine, kwargs), axis=1),
            meta=('transformed_data', object)
        )

  • what process_row does is first download the files required to process the row first from remote storage (in this case the image) and the calls the row_routine method. The row_routine method does the row processing (lets say converting the downloaded image to a numpy array/tensor)
  1. Now I want to save the processed partition on each worker’s local storage.
def save_partition(partition_df, params):
                Path(params.output_path).mkdir(parents=True, exist_ok=True)
                path = os.path.join(params.output_path,
                    "data" + str(get_worker().id) + ".pkl")
                partition_df.to_pickle(path)
            
temp = transformed_data.map_partitions(lambda part: save_partition(part, params), meta=('temp', object))
temp.compute()

When I run this flow, for smaller datasets it works i.e.workers are saving local partitions on their local storage. But for larger datasets (lets say 6-7GB of images) on a 8 node cluster with 16 workers on each node (so total 128 dask workers)), the workers get killed.

The scheduler logs mention this:

distributed.core - INFO - Event loop was unresponsive in Scheduler for 26.22s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

I know we are moving files over the network and want only the worker to save its local processed partition. But I think calling temp.compute() is maybe overloading the scheduler which might be the issue since the scheduler could be trying to get all the data on its node? While checking the cluster statistics, I observed that the scheduler is storing 100GB+ of data while the workers are having at max 10GB of data in memory.

The below error is in dask worker logs:

If the above flow is not optimal, I would also like to hear thoughts on what I can do to optimize the flow and also ensure the data parallel operation can happen without overloading the scheduler.

@vigneshn1997 Thanks for your question! Would you be able to share a minimal, reproducible example?

I observed that the scheduler is storing 100GB+ of data while the workers are having at max 10GB of data in memory.

Could you please share where you’re observing this? Is it through the dashboard?

I don’t exactly have a minimal reproducible example since the issue is only occuring on large datasets. I am observing this usage in Grafana/Prometheus dashboard where I see the cluster memory usage.

But I found a way to solve the issue. I used client.submit to submit data loading and saving tasks to workers. This ensured that .compute() is not called and hence the computation is not brought into the dask scheduler. I am testing with this solution now.