We have a use case where we are reading many parquet files that might have different schemas. To parallelize this we are submitting the read_parquet to workers and concatenating the different dataframes in the client. [by the way, the new from_map looks like a good function for us]
We noticed that, when reading the parquet files on workers, it takes much much longer to serialize the dask graph of the final dataframe to send it to the scheduler. It takes many minutes to serialize the graph when reading on workers vs few seconds when reading locally on the client.
By investigating a bit, we noticed that the layers in the distributed versions are all MaterializedLayers while the layers read locally are DataFrameIOLayer.
And we can reproduce this transformation from DataFrameIOLayer to MaterializedLayer when pickling the first one.
import pickle
import dask.dataframe as dd
df = dd.read_parquet('any.parquet')
key=list(df.dask.layers.keys())[0]
# this layer is a DataFrameIOLayer
read_parquet_layer = df.dask.layers[key]
print(read_parquet_layer)
pickled_layer = pickle.loads(pickle.dumps(read_parquet_layer))
# this is a MaterializedLayer
print(pickled_layer)
So we would like to understand a bit better:
Why is the DataFrameIOLayer transformed to a MaterializedLayer when pickled?
Why is the MaterializedLayer so heavier to serialize?
You should not, generally speaking, mix the collections API (dd.*) with the client.map/submit or delayed API. Why not just dd.read_parquet(many_parquet_files) ?
Nevertheless, I would expect the DataFrameIOLayer to serialise efficiently, so you may well want to report this as an issue in Dask.
The problem is that these files might have a different schema (there might be columns in some of the files that are not avaliable in others). So when we use dd.read_parquet we get an error.
That’s why we are reading each file separately and concatenating them but doing it in the client sequentially takes a long time. So we are using workers to do this.
I agree with @martindurant that it’s usually not a good idea to do generate dask collections from within dask tasks, as it can result in hard-to-debug failures, and is generally not very ergonomic.
Might I suggest using the new dd.from_map() dataframe creation function? That would allow you to be somewhat more flexible about creating your partitions, and you could use it to do some additional processing on them to ensure that they have compatible schemas. So your example might look something like
If that fixes your issue with schemas, then you shouldn’t need to worry about pickling layers at all. But here are some attempts to address your questions about them:
This is deliberate! See the implementation for __reduce__here. A longer answer: you’re not really supposed to pickle layers. Historically, they have used their own serialization protocol in __dask_distributed_pack__ and __dask_distributed_unpack__. This was done to avoid security problems with pickle. However, Dask may be removing that constraint in the near future, at which point you would be free to pickle/unpickle whenever you want without materializing the graph (though you should still try to avoid that and use more public APIs like from_map).
Materialized layers consist of a fully-generated task graph, which could be quite large (probably tens or hundreds of thousands of tasks in your case). Whereas unmaterialized layers just contain the information that they need to produce their tasks, which is typically much smaller. You might think of it like the difference between a generator and a list in python.
Once again thanks for your time to answer our questions.
Our current solution using workers to read the parquet is definitely a hack and dd.from_map will help to make it cleaner. We were unsure if we should use it given the warning on the docs. But I think it’s the way to go.
I have a followup question (maybe it’s not related and I shloud open another thread?).
We’ve noticed that due to the number of parquet files we need to read we’re using a lot of memory on the scheduler (~6k files will make the scheduler use around 16GB of memory). I believe this is caused by the size of the materialized task graph.
This is currently the bottleneck in some of the jobs we run. Is there a way for us to “circumvent” this? (Meanwhile we’ll be trying to improve the scale by reprocessing and grouping some of the files).
This is surprising to me! The storage needed for ~6K tasks should be much smaller than this (in the simplest case, it needn’t be much larger than the string path to the file, so a few tens of bytes per task). Is there anything particularly heavy in your task definitions? Are you embedding a numpy array or pandas dataframe in them somewhere?
There isn’t really a way around eventually materializing the graph, so I’d look to figuring out why it is so memory heavy.
I just wanted to give you an update on this: we implemented the solution using from_map and it works much better. It also solved the memory problems on the scheduler side.
As a side note, I think the memory problem happened when we had too many partitions (files to read) with a very high number of columns Each layer was sent to the scheduler with the column metadata and that consumed too much memory. We didn’t really investigate it so it’s just a hunch.