Loading Parquet file from S3 using HDFS file system

Hi,
I want to load the parquet file from s3, using the HDFS file system. All the authentication-related configuration is done.
I am using the HDFS file system because all the authentication is done automatically by the underlying system, and I can access all other cloud storage.

import dask.dataframe as dd
import fsspec


storage_options={"host": "host-ip",
                "port": "8020",
                "user": "user",
                "kerb_ticket": "/tmp/krb5cc_0"}

path = "s3a://mydata/data/warehouse/tablespace/external/hive/us_customers_parquet/000000_0"

hadoop_fs = fsspec.filesystem(protocol="hdfs", storage_options=storage_options)
df: dd = dd.read_parquet(path=path,
                engine="pyarrow",
                filesystem=hadoop_fs)

print(df.compute())

Error:

24/03/08 05:30:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/08 05:30:30 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
hdfsOpenFile(mydata/data/warehouse/tablespace/external/hive/us_customers_parquet/000000_0): FileSystem#open((Lorg/apache/hadoop/fs/Path;I)Lorg/apache/hadoop/fs/FSDataInputStream;) error:
RemoteException: File does not exist: /user/temp/mydata/data/warehouse/tablespace/external/hive/us_customers_parquet/000000_0
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:87)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:77)
    at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:159)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2040)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:737)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:454)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:533)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:989)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:917)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2894)
java.io.FileNotFoundException: File does not exist: /user/temp/mydata/data/warehouse/tablespace/external/hive/us_customers_parquet/000000_0
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:87)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:77)
    at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:159)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2040)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:737)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:454)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:533)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:989)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:917)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2894)


Traceback (most recent call last):
  File "/home/venv/lib/python3.8/site-packages/dask/backends.py", line 135, in wrapper
    return func(*args, **kwargs)
  File "/home/venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 543, in read_parquet
    read_metadata_result = engine.read_metadata(
  File "/home/venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py", line 532, in read_metadata
    dataset_info = cls._collect_dataset_info(
  File "/home/venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py", line 1047, in _collect_dataset_info
    ds = pa_ds.dataset(
  File "/home/venv/lib/python3.8/site-packages/pyarrow/dataset.py", line 785, in dataset
    return _filesystem_dataset(source, **kwargs)
  File "/home/venv/lib/python3.8/site-packages/pyarrow/dataset.py", line 475, in _filesystem_dataset
    return factory.finish(schema)
  File "pyarrow/_dataset.pyx", line 3025, in pyarrow._dataset.DatasetFactory.finish
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
FileNotFoundError: [Errno 2] Error creating dataset. Could not read schema from 'mydata/data/warehouse/tablespace/external/hive/us_customers_parquet/000000_0'. Is this a 'parquet' file?: Opening HDFS file 'mydata/data/warehouse/tablespace/external/hive/us_customers_parquet/000000_0' failed. Detail: [errno 2] No such file or directory

Even though I have passed the s3a file path, it will try to find it in hdfs path.
Can you please help me is there any way i can use hdfs file system and read from other cloud storage.

Hi,

It seems you are in a complex situation. You have and HDFS storage system which also provides a S3 interface?

I think that if using a filesystem kwarg in the read_parquet call, Dask will try to get the file using it. You should only use the S3 interface to be able to read the files with this protocol.

Again, maybe @martindurant has more to say here.

HDFS client also supports s3 access here it is also mentioned in the docs.
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html

Okay so if I understand correctly, you want to read data from AWS S3 using the HDFS s3a client through Dask Dataframe and fsspec. This sounds like a complex setup, and I really don’t know if you can find a correct setup to do this.

You might need the “arrow” filesystem rather than “hdfs” to do this; but honestly I don’t know what invocation it would take. Perhaps do a little experimentation with the raw arrow FS, and then the fsspec wrapper (hdfs or arrow) for them, before trying to combine it with dask too. I really didn’t know that you could authenticate to an s3 using hdfs-kerberos.