I’m stumped. I’m trying out the AWS Sagemaker Dask example on my own data that sits in S3. Currently I’m focusing on retrieving the data and then I’ll process it. I extract my files in a dask dataframe, however, when I run len(df) to see the number of rows in my dataframe the code fails.
File "/opt/conda/lib/python3.8/site-packages/pyarrow/parquet.py", line 401, in read
return self.reader.read_all(column_indices=column_indices,
File "pyarrow/_parquet.pyx", line 1139, in pyarrow._parquet.ParquetReader.read_all
File "/opt/conda/lib/python3.8/site-packages/fsspec/spec.py", line 1483, in read
out = self.cache._fetch(self.loc, self.loc + length)
File "/opt/conda/lib/python3.8/site-packages/fsspec/caching.py", line 40, in _fetch
return self.fetcher(start, stop)
File "/opt/conda/lib/python3.8/site-packages/s3fs/core.py", line 1920, in _fetch_range
return _fetch_range(
File "/opt/conda/lib/python3.8/site-packages/s3fs/core.py", line 2070, in _fetch_range
return sync(fs.loop, resp["Body"].read)
File "/opt/conda/lib/python3.8/site-packages/fsspec/asyn.py", line 67, in sync
raise FSTimeoutError
fsspec.exceptions.FSTimeoutError
I’ve tried changing limit to 3h on the dask ymal file but to no avail.
The code:
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
args, _ = parser.parse_known_args()
# Get processor scrip arguments
args_iter = iter(sys.argv[1:])
script_args = dict(zip(args_iter, args_iter))
scheduler_ip = sys.argv[-1]
# S3 client
s3_region = script_args["s3_region"]
s3_client = boto3.resource("s3", s3_region)
s3_client2 = boto3.client("s3", s3_region)
print(f"Using the {s3_region} region")
# Start the Dask cluster client
try:
client = Client("tcp://{ip}:8786".format(ip=scheduler_ip))
logging.info("Printing cluster information: {}".format(client))
except Exception as err:
logging.exception(err)
df_1 = dd.read_parquet('s3://bucket/prefix/data_0_*.parquet')
df_2 = client.persist(df_1)
print(len(df_2))
The df only holds 3GB worth of data so I don’t understand at all why it would fail. Pandas would have executed it in seconds if not minutes. Maybe its not distributing correctly?