Dask Bag vs. DataFrame to load AVRO data from cloud storage. Error: Access Denied: Operation ListObjectV2

Hi I have a data lake whose data is stored on S3.

Python version: 3.8
dask version: "dask[complete]"==2023.5.0

I have to read this data from S3 and do certain aggregations. I cannot directly access the S3 and I have to assume a role. For the sake of simplicity I have given S3FullAccess to this role that I assume in my code.
As we know the dask DataFrame do not have the capability to load AVRO data so we need to use Daskbag.

I am facing a weird issue when I try to access AVRO files from DaskBag I get this Access Denied: Operation ListObjectV2 however when using DataFrame to load data from S3 for other file formats I see no error and program actually loads the data correctly for other file formats like Parquet/ORC/CSV.

Here is a code that runs properly:

import dask.dataframe as dd
import os

ACCESS_KEY='MY_KEY'
SECRET_KEY='KEY_SEC_KEY'
SESSION_TOKEN='MY_LONG_TOKEN/Gm+YYzuj46'

path = "s3://prod-v2-datalake/dc-dataload/data/warehouse/tablespace/external/hive/us_customers_icebeg/data/*"
os.environ["AWS_ACCESS_KEY_ID"]=ACCESS_KEY
os.environ["AWS_SECRET_ACCESS_KEY"]=SECRET_KEY
os.environ["AWS_SESSION_TOKEN"]=SESSION_TOKEN
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"
os.environ["AWS_REGION"] = "us-west-2"

df = dd.read_parquet(path)

df.compute()

print(df.head())

I see the dataframe to load data correctly

However I see the error with following program:

import os

import dask.bag as dd


ACCESS_KEY='MY_KEY'
SECRET_KEY='KEY_SEC_KEY'
SESSION_TOKEN='MY_LONG_TOKEN/Gm+YYzuj46'

path = "s3a://prod-v2-datalake/dc-dataload/data/warehouse/tablespace/external/hive/us_customers_avro/000000_0"

os.environ["AWS_ACCESS_KEY_ID"]=ACCESS_KEY
os.environ["AWS_SECRET_ACCESS_KEY"]=SECRET_KEY
os.environ["AWS_SESSION_TOKEN"]=SESSION_TOKEN
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"
os.environ["AWS_REGION"] = "us-west-2"

s3_opts = {'anon': True, 'use_ssl': False, 'key': ACCESS_KEY, 'secret':SECRET_KEY, 'token': SESSION_TOKEN}
df = dd.read_avro(path, storage_options=s3_opts)

df.compute()
print(df.head())

Error:

Traceback (most recent call last):
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/s3fs/core.py", line 720, in _lsdir
    async for c in self._iterdir(
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/s3fs/core.py", line 770, in _iterdir
    async for i in it:
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/aiobotocore/paginate.py", line 30, in __anext__
    response = await self._make_request(current_kwargs)
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/aiobotocore/client.py", line 408, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (AccessDenied) when calling the ListObjectsV2 operation: Access Denied

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "avro_test.py", line 25, in <module>
    df = dd.read_avro(path, storage_options=s3_opts)
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/dask/bag/avro.py", line 102, in read_avro
    fs, fs_token, paths = get_fs_token_paths(
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/fsspec/core.py", line 657, in get_fs_token_paths
    paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/s3fs/core.py", line 799, in _glob
    return await super()._glob(path, **kwargs)
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/fsspec/asyn.py", line 804, in _glob
    allpaths = await self._find(
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/s3fs/core.py", line 829, in _find
    return await super()._find(
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/fsspec/asyn.py", line 846, in _find
    if withdirs and path != "" and await self._isdir(path):
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/s3fs/core.py", line 1480, in _isdir
    return bool(await self._lsdir(path))
  File "/home/ec2-user/.local/share/virtualenvs/profiler-9--y97BQ/lib/python3.8/site-packages/s3fs/core.py", line 733, in _lsdir
    raise translate_boto_error(e)
PermissionError: Access Denied

I have verified the correct permissions on AWS IAM role. It cannot be a role issue as Praquet and ORC files we can read properly.

Can I get some direction how can I load the AVRO files ? I appreciate your time and attention in advance.

Hi @ce-ankur, welcome to Dask community!

Is this anon key intended?

And if this is not the issue, could you try to list the bucket with only s3fs?

Hi Guillaumeeb,

Thanks for your attention for this issue.

This I followed from S3fs docs. S3Fs — S3Fs 2024.6.1+0.g2763a08.dirty documentation
At some point in time I also tried with False value as well.

Yes With S3fs we are able to get the object. I can again confirm this as I cannot remember if we tried S3fs directly specially with AVRO files, but rest other files were working.

You should definitly not use 'anon': True, and just get rid of this option. Also I notice s3a:// in your URL instead of s3:// ?

And please first check than you can access this part with plain s3fs calls, just to be sure.

Hi @guillaumeeb
I tried as you suggested, here is my code:

import s3fs
import requests


ACCESS_KEY='MY_KEY'
SECRET_KEY='KEY_SEC_KEY'
SESSION_TOKEN='MY_LONG_TOKEN/Gm+YYzuj46'


os.environ["AWS_ACCESS_KEY_ID"]=ACCESS_KEY
os.environ["AWS_SECRET_ACCESS_KEY"]=SECRET_KEY
os.environ["AWS_SESSION_TOKEN"]=SESSION_TOKEN
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"
os.environ["AWS_REGION"] = "us-west-2"


s3 = s3fs.S3FileSystem(key=ACCESS_KEY, secret=SECRET_KEY, token=SESSION_TOKEN)

avro_path = "s3a://eng-sdx-longrunning-v2-datalake/dc-dataload-vr/data/warehouse/tablespace/external/hive/us_customers_avro/000000_0"

with s3.open(avro_path, 'rb') as f:
    print(f.read())

This program works are we are able to read the file. Another version of same program which also works is given below:

import s3fs
import requests


ACCESS_KEY='MY_KEY'
SECRET_KEY='KEY_SEC_KEY'
SESSION_TOKEN='MY_LONG_TOKEN/Gm+YYzuj46'


os.environ["AWS_ACCESS_KEY_ID"]=ACCESS_KEY
os.environ["AWS_SECRET_ACCESS_KEY"]=SECRET_KEY
os.environ["AWS_SESSION_TOKEN"]=SESSION_TOKEN
os.environ["AWS_DEFAULT_REGION"] = "us-west-2"
os.environ["AWS_REGION"] = "us-west-2"


s3 = s3fs.S3FileSystem(key=ACCESS_KEY, secret=SECRET_KEY, token=SESSION_TOKEN)

avro_path = "s3a://eng-sdx-longrunning-v2-datalake/dc-dataload-vr/data/warehouse/tablespace/external/hive/us_customers_avro/000000_0"

s3.get(avro_path, './us_customers_avro.csv')

print(open('us_customers_avro.csv', 'r').read())  ## this line gives error because of decoding issues of the file, but s3.get operation is successful

So I am not sure what else we could do differently in this small program. Let me know if you have any other direction

Did you tried the Dask code without anon and use_ssl key? It seems you are not using it in the plain s3fs calls.

cc @martindurant also.

I agree that the anon and ssl flags appear fishy, since they are not there in the successful dataframe call at the top or using s3fs directly. Where did those “options” come from?

Note that the error comes from a listing operation. So the correct test of s3fs is s3.ls(avro_path) or the parent of that path. I’m not quite sure what the avro code calls right now, but it’s conceivable that it contains a listing call where the parquet code does not.

Sidelong related: ak.from_avro_file — Awkward Array 2.6.3 documentation is a fast vectorized avro reader, if your data is not simply tabular, but nested/variable-length. It may be better for tabular data too. This call has not yet been wrapped by dask-awkward, otherwise you would have a much more efficient way to dask-process this data. You could still use the fast method with dask.delayed, though, and build your dataframe that way. It takes a file-like object, so you would have a function like

import awkward as ak
import fsspec

@dask.delayed
def read_one(path):
    with fsspec.open(path, "rb") as f:
        return ak.to_dataframe(ak.from_avro_file(f))
1 Like

Yes I tried removing anon and use_ssl both. Still same results.