Reading data (and image data) from HDFS for training

I’m working on a bachelor’s thesis on machine learning and we are using Dask for data loading.
We are using PyTorch, TensorFlow and Rapids.

We have a cluster with dask-worker and a HDFS file system with replicas. The dask-worker sits on the data nodes.

I found in the documentation that data can be loaded from hdfs like that:

" dask.dataframe.read_csv(’ hdfs:///path/to/files.*.csv’ blocksize=“256MB”) "
" dask.dataframe.read_parquet(" hdfs:// path… :1234 data/ * ") # 1234 is the Port
(Frequently Asked Questions — Dask.distributed 2023.9.2 documentation and Connect to remote data — Dask documentation)

The result is, I get a dataframe with many partitions.

If every worker executes this function:
def train():

dataframe = read_csv(’ hdfs:///path/to/files.*.csv’ blocksize=“256MB”) "

local_data_dataframe = dataframe.get_patition(x) # x is the number of the partition

I get a dataframe with many partitions. Does every worker uses it’s own local partitions without sending partitions/Data over the network? That’s what I could not figure out to be sure it’s really like that.

If I read many files or one large file from HDFS, does dask creates partitions of local available data?

My second question is, what is the best way to iterate through a dataframe that one worker created locally, without accessing partitions that are not local. Lets say the dataframe has 20 partitions and I have 4 nodes, each node has one dask-worker. So each node has 5 partitions.

I want to create a partitioned dataframe of these 5 partitions to have a local dataframe. So I can get the length and can iterate through this local dataframe. But I can’t list the partition numbers. I have to check it with " client.has_what() " after persisting the loaded Dataframe like df = df.persist().

My last question is, how to read all local available images from a HDFS path.
I tried to read the data like that:
" array = dask.array.image.imread(hdfs:// … ) "
but I get an error: No files found under name hdfs://…

With pyarrow I can read images like that:
def read():
from pyarrow import fs
hdfs = fs.HadoopFileSystem(path , port=…, user=… )
hdfs.open_input_file( “image_name”)

This way I need to provide the image name. But I don’t know if the image is locally available, if this dataset of images is replicated over the cluster. Some images may be on other data nodes.

To put it briefly:

  • If every worker on a data node executes "read_csv(’ hdfs://… ') does dask creates partitions only from local data?
  • How can I create a dataframe from an other partitioned dataframe with local available partitions, if I don’t know the partition number at the beginning.
  • How can I read all local images from a worker from a HDFs path.

I can provide more information and code to show the situation.
I’m grateful for every comment!

Hi @AlexS, welcome to Dask community!

I’m not sure what is your Workflow, but you probably don’t want to have each worker read the entire Dataframe (even if it’s only its metadata). With Dask, you should just call the read_csv method on your Client side.

Nope, not anymore, see @martindurant’s Stackoverflow post: Does Dask communicate with HDFS to optimize for data locality? - Stack Overflow

Quite right, with the appearance of arrow’s HDFS interface, which is now preferred over hdfs3, the consideration of block locations is no longer part of workloads accessing HDFS, since arrow’s implementation doesn’t include the get_block_locations() method.

In most cases, you don’t want to do that, but I’m curious of why you’re trying to do this?

I guess the best way is to build your own function, using the PyArrow code you provided. You’ll need to list all the images, read each image inside a Delayed call, and then build a Dask Array with the from_delayed function. Here again, you won’t be able to take advantage of data locality.

Hello!

Thank you for the fast respond.

My questions got answered. :+1:
The idea of iterating through the DataFrame was (I might described it wrong), to load all local data into a DataFrame and then to return data by an index (for a custom PyTorch Dataset).

:+1: