Dask persist generates a large graph

Hi all,

I am getting large graphs when I persist data.

I have created the following example:

from dask.distributed import Client, LocalCluster
import dask.array as da
import dxchange

def read_data(filename, dataset):
“”"
Read dataset into dask array
“”"
data = dxchange.reader.read_hdf5(filename, dataset)
data = da.from_array(data, chunks={0: “auto”, 1: -1, 2: -1})
return data.persist()

filename = “file.h5”
dataset = “/exchange/data”

data_f = client.submit(read_data, filename, dataset)


This code generates the error:

distributed/client.py:3371: UserWarning: Sending large graph of size 195.27 MiB. This may cause some slowdown. Consider loading the data with Dask directly or using futures or delayed objects to embed the data into the graph without repetition.

This warning is only generated if the data is persisted.
The recommendation is to use futures and dask, but I don’t get what is wrong with this implementation.

It seams that the data in added to the graph when the it is persisted,
I would expect that the graph would add a reference to the data, but not the data it self.

Could anyone cast some light on this?

Hi @Gabriel, welcome to Dask community!

I think the trick here is that you submit an object that is already a Dask collection, you shouldn’t have to do this, wether or not you use persist.

You might only want to submit or Delayed the dxchange.reader.read_hdf5 if this one is blocking. Submitting a Dask function that returns a Dask Array is not common, and persisting it it probably even less common.

Just to be clear, I’m suggesting something like:

from dask.distributed import Client, LocalCluster
import dask.array as da
from dask import delayed

filename = “file.h5”
dataset = “/exchange/data”
fut_array = delayed( dxchange.reader.read_hdf5)(filename, dataset)
data = da.from_delayed(fut_array)
data = data.persist()

Thank you @guillaumeeb.

The point of submitting the read_data method is to do all heavy operation (as reading big data and processing it) in the computing cluster, and setup the data workflow locally just using the futures.

If I create the dask array from a delayed object I get a single chunk dask array which is heavy and difficult to handle.

I understand that, my proposal doesn’t change it. As it is, you’ll read the whole data in a single Worker as a single chunk, and then rechunk it.

Yes, I agree also, but this is already the case except you rechunk it afterwards (When using from_array). You can also do this manualy after reading the single chunk using my code. But what would be better is to read in a chunked way from the start, not sure if this is possible with dxchange package, but it should be with plain from_hdf5 dask method.