I’m working on a bachelor’s thesis on machine learning and we are using Dask for data loading.
We are using PyTorch, TensorFlow and Rapids.
We have a cluster with dask-worker and a HDFS file system with replicas. The dask-worker sits on the data nodes.
I found in the documentation that data can be loaded from hdfs like that:
" dask.dataframe.read_csv(’ hdfs:///path/to/files.*.csv’ blocksize=“256MB”) "
" dask.dataframe.read_parquet(" hdfs:// path… :1234 data/ * ") # 1234 is the Port
(Frequently Asked Questions — Dask.distributed 2023.9.2 documentation and Connect to remote data — Dask documentation)
The result is, I get a dataframe with many partitions.
If every worker executes this function:
def train():
…
dataframe = read_csv(’ hdfs:///path/to/files.*.csv’ blocksize=“256MB”) "
local_data_dataframe = dataframe.get_patition(x) # x is the number of the partition
I get a dataframe with many partitions. Does every worker uses it’s own local partitions without sending partitions/Data over the network? That’s what I could not figure out to be sure it’s really like that.
If I read many files or one large file from HDFS, does dask creates partitions of local available data?
My second question is, what is the best way to iterate through a dataframe that one worker created locally, without accessing partitions that are not local. Lets say the dataframe has 20 partitions and I have 4 nodes, each node has one dask-worker. So each node has 5 partitions.
I want to create a partitioned dataframe of these 5 partitions to have a local dataframe. So I can get the length and can iterate through this local dataframe. But I can’t list the partition numbers. I have to check it with " client.has_what() " after persisting the loaded Dataframe like df = df.persist().
My last question is, how to read all local available images from a HDFS path.
I tried to read the data like that:
" array = dask.array.image.imread(hdfs:// … ) "
but I get an error: No files found under name hdfs://…
With pyarrow I can read images like that:
def read():
from pyarrow import fs
hdfs = fs.HadoopFileSystem(path , port=…, user=… )
hdfs.open_input_file( “image_name”)
This way I need to provide the image name. But I don’t know if the image is locally available, if this dataset of images is replicated over the cluster. Some images may be on other data nodes.
To put it briefly:
- If every worker on a data node executes "read_csv(’ hdfs://… ') does dask creates partitions only from local data?
- How can I create a dataframe from an other partitioned dataframe with local available partitions, if I don’t know the partition number at the beginning.
- How can I read all local images from a worker from a HDFs path.
I can provide more information and code to show the situation.
I’m grateful for every comment!