Context
The goal of my code is to:
- read many (~30k) files (>10TB in total, each one 3MB - 3GB in size), these are non-standard and have no dask readers,
- extract features from them, (these features are small - typically only 1 - 10% of the original file).
- save features to a dataframe on local disk where I can use them later for dataframe type analysis (groupbys, correlations, conditional means etc.)
I’m using Dask 2022.02.1 on a debian 10 cluster. I’m using the dask.futures
api because part of the feature extraction involves reference data which I scatter to each worker.
Problem
My code works fine on a small scale when I write the data frame to hdf
format. However, I run the code on all the data it crashes with the following exception:
distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:39109
Traceback (most recent call last):
File "/home/rob/miniconda3/envs/bax/lib/python3.9/site-packages/distributed/comm/tcp.py", line 211, in read
frames_nbytes = await stream.read_bytes(fmt_size)
asyncio.exceptions.CancelledError
Code
cluster = LocalCluster(n_workers=8,
threads_per_worker=1,
dashboard_address='localhost:8888',
memory_limit=10GiB)
client = Client(cluster)
client.amm.start()
reference = load_reference_data(path_to_reference_data)
ref_future = client.scatter(reference, broadcast=True)
pdf_futures = []. # pdf = 'pandas dataframe' feature df created by `load`
for path in paths: # list of paths to files len(paths)~30k
pdf_futures.append(client.submit(load, *(str(path), ref_future))
ddf_future = dd.from_delayed(pdf_futures, meta=dict_of_col_types)
dd.to_hdf(ddf_future, path='output.h5', key='output', scheduler=client, compute=True )
client.shutdown()
things I have tried
I’ve tried to write out individual dataframes but I can’t get this to work. Here are some attempts:
Attempt 1:
results = []
for path in sample_paths:
pdf_future = client.submit(load, *(str(path), ref_future))
ddf = dd.from_delayed([pdf_future], meta=dict_of_col_type)
write = ddf.to_hdf(out_dir.joinpath(Path(path.with_suffix('.hdf').name)),
key='features',
compute=False, scheduler=client)
results.append(write)
client.compute(*results)
I get this error:
TypeError: Truth of Delayed objects is not supported
Attempt 2:
for path in sample_paths:
pdf_future = client.submit(load, *(str(path), ref_future))
ddf_future = client.submit(dd.from_delayed, pdf_future, meta=dict_of_col_type)
write = client.submit(dd.to_hdf, ddf_future,
path=out_dir.joinpath(Path(path.with_suffix('.hdf').name)),
key='features',
compute=False)
client.gather(results)
I get this error:
---> 32 client.gather(results)
File ~/miniconda3/envs/bax/lib/python3.9/site-packages/distributed/client.py:2162, in Client.gather(self, futures, errors, direct, asynchronous)
2160 else:
2161 local_worker = None
-> 2162 return self.sync(
2163 self._gather,
2164 futures,
2165 errors=errors,
2166 direct=direct,
2167 local_worker=local_worker,
2168 asynchronous=asynchronous,
2169 )
File ~/miniconda3/envs/bax/lib/python3.9/site-packages/distributed/utils.py:311, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
...
--> 601 raise TypeError("Expected Delayed object, got %s" % type(df).__name__)
603 if meta is None:
604 meta = delayed(make_meta)(dfs[0]).compute()
TypeError: Expected Delayed object, got str
This is from the from_delayed
step (after some investigation).
Question
I am not particularly tied to any of my methods for doing this - I just really want to know: how can I achieve my goals?
The only extra thing I’m concerned about is that sometimes my features may be huge (many columns) but they are just not in this case.
Thanks in advance!