Dask on AWS Sagemaker Exception: 'FSTimeoutError()'

I’m stumped. I’m trying out the AWS Sagemaker Dask example on my own data that sits in S3. Currently I’m focusing on retrieving the data and then I’ll process it. I extract my files in a dask dataframe, however, when I run len(df) to see the number of rows in my dataframe the code fails.

  File "/opt/conda/lib/python3.8/site-packages/pyarrow/parquet.py", line 401, in read
    return self.reader.read_all(column_indices=column_indices,
  File "pyarrow/_parquet.pyx", line 1139, in pyarrow._parquet.ParquetReader.read_all
  File "/opt/conda/lib/python3.8/site-packages/fsspec/spec.py", line 1483, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/opt/conda/lib/python3.8/site-packages/fsspec/caching.py", line 40, in _fetch
    return self.fetcher(start, stop)
  File "/opt/conda/lib/python3.8/site-packages/s3fs/core.py", line 1920, in _fetch_range
    return _fetch_range(
  File "/opt/conda/lib/python3.8/site-packages/s3fs/core.py", line 2070, in _fetch_range
    return sync(fs.loop, resp["Body"].read)
  File "/opt/conda/lib/python3.8/site-packages/fsspec/asyn.py", line 67, in sync
    raise FSTimeoutError
fsspec.exceptions.FSTimeoutError

I’ve tried changing limit to 3h on the dask ymal file but to no avail.
The code:

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    # Get processor scrip arguments
    args_iter = iter(sys.argv[1:])
    script_args = dict(zip(args_iter, args_iter))
    scheduler_ip = sys.argv[-1]

    # S3 client
    s3_region = script_args["s3_region"]
    s3_client = boto3.resource("s3", s3_region)
    s3_client2 = boto3.client("s3", s3_region)
    print(f"Using the {s3_region} region")

    # Start the Dask cluster client
    try:
        client = Client("tcp://{ip}:8786".format(ip=scheduler_ip))
        logging.info("Printing cluster information: {}".format(client))
    except Exception as err:
        logging.exception(err)
    df_1 = dd.read_parquet('s3://bucket/prefix/data_0_*.parquet')
    df_2 = client.persist(df_1)
    print(len(df_2))

The df only holds 3GB worth of data so I don’t understand at all why it would fail. Pandas would have executed it in seconds if not minutes. Maybe its not distributing correctly?

@Hasna94 Welcome to Discourse!

I was able to reproduce this with a LocalCluster and looks like your explicit boto3 client is interfering with Dask’s internals (Dask also uses boto3 internally to connect to S3)

So, I believe using read_parquet directly will work in your case (no need to use the # S3 client section):

import dask.dataframe as dd
from dask.distributed import Client

client = Client()

ddf = dd.read_parquet("s3://coiled-datasets/nyc-taxi/parquet",
                      storage_options={"anon": True, "use_ssl": True},
                     )

ddf.head()
1 Like