I have a data processing pipeline that I use dask for the beginning but I need to get the data into a large Numpy array for further processing with existing code. The array is around 250k x 25k complex64. I have tried using futures and slices from chunks to parallelize this but this is still slow. Any thoughts on the best way to go from Dask Array to Numpy?
def dask_to_numpy(dask_array, numpy_array):
future_map = {}
for _slice in da.core.slices_from_chunks(dask_array.chunks):
future = client.compute(dask_array[_slice])
future_map[future] = _slice
for future in tqdm("Copying chunks to numpy array",
as_completed(future_map.keys()),
total=len(future_map.keys())):
_slice = future_map[future]
numpy_array[_slice] = future.result()
future = None
My chunk size is 1024 x 25k. Maybe that’s too large and is killing performance?
This is also slow:
def dask_to_numpy(dask_array, numpy_array):
future = dask_array.persist()
progress(future)
numpy_array = future.result()
What is left out is a routine that pre-allocates the numpy_array as either an array or a memmap depending on array size and available memory.