When to repartition if the data is not computed?

My question is regarding the computation of Dask graphs. This is my current understanding: I read multiple files in and partition them by size, 50MB for example. Now I do drop_duplicates and groupby’s and so on, and then save the resulting partitions back to disk, one file per partition. This will prevent my process from running out of memory, because I am not calling compute() anywhere until the files are saved.
What I am confused about is the relation to the “split” (_every and _out) parameters found in a lot of the methods, for example groupby.last(). My .last() result is MOST of the times around the same size as the original data, but not ALWAYS, so I don’t want to use a magic number like split_out=8 because I don’t want to create unnecessary partitions if the result is small enough (and if the data is reduced to less than 8 entries, which SHOULD not happen but CAN, this will even cause a runtime error). But will the process run out of memory if split_out is not specified and defaulted to 1 partition? Does this even have an impact if I don’t compute the data until it is saved to disk again (and repartitioned by size right before)?

I guess my question is, WHEN should you repartition at all? Does it have a memory impact if not computed during the process, when lazy methods are still being called? And why do these functions only have a split_out parameter for the number of partitions and not a partition_size option like repartition()?

Hi Straegge, thanks for the question! I split out the answers below.

The ideal number of partitions after your aggregation will depend on the size of your dataset. If the size of the dataset is roughly the same as before the processing steps you mentioned, then it probably makes sense not to change the number of partitions. split_out defaults to 1 because often after a groupby (or other aggregation) the size of the dataset is reduced quite a bit. For your dataset, are the number of groups the same order of magnitude as the size of your original dataset?

For repartitioning, it can be useful after an aggregation, as you’ve mentioned, but it can also be useful when reading in data that was poorly partitioned or after filtering a dataset. There’s some additional guidance here.

Choosing the ideal partitioning will certainly have an impact because you have to compute the data before saving to disk, but as mentioned above will depend on the size of your dataset.

Hi scharlottej13, thank you for taking the time to respond, I really appreciate it.

For your dataset, are the number of groups the same order of magnitude as the size of your original dataset?

That’s part of the problem, I don’t know because it depends on the data. For example, I am grouping by a key and then doing .last(), effectively implementing an “update with last” functionality as “drop data with duplicate keys but use the last non-NaN value of each column”. But whether there is 1 or 500k duplicate keys in 1 million rows is dependant on the data (such large duplicates is unlikely in production but I don’t feel comfortable simply relying on the data being around the same size).

I was just wondering if there is a built-in solution for this, that’s why I asked why split_out cannot be a partition_size like “100MB”, and if this would be a feature request or designed this way on purpose.

Let’s say my data is around the same size after as before .last(split_out=1), and does not fit in memory. But I call .repartition(npartitions=before.npartitions * (len(after) // len(before))) immediately afterwards. When I call .compute(), will my process run OOM, because for the short time between .last() and .repartition(), the 1 partition will be loaded completely into memory to repartition it? Or will Dask recognize that the .last() result will be partitioned immediately and therefore not load the resulting single, large partition completely into memory?

This is where my confusion lies, I don’t fully understand when exactly my process runs the risk of going OOM because I don’t know how Dask will work with the data during computation. If the example above could run OOM, this is also a valid answer, then I have to find a way to calculate the ratio of duplicate keys beforehand and pass it to split_out directly.

Thanks @Straegge for adding more details! I think your last paragraph is correct, running out of memory is a valid concern and you’ll probably want to find a way to determine the ideal number of partitions and pass that to split_out. I can follow up with a visualization of how this task graph might look.

Here’s an example using repartition and you can see the possibility for a bottleneck, as you’ve pointed out:

import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame({"a": [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10]})
ddf = dd.from_pandas(df, npartitions=5)
ddf.groupby("a").last(split_out=1).repartition(npartitions=5).visualize()

download

And then without repartition, but changing the number of partitions with split_out:

import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame({"a": [1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10]})
ddf = dd.from_pandas(df, npartitions=5)
ddf.groupby("a").last(split_out=5).visualize()

download-2

Hi scharlottej13, thank you very much for your reply.

I was not aware of the visualization feature, or at least not that it is this helpful! This greatly helps me to understand the issue and I will certainly make use of this whenever I have doubts about the graphs.

One last question perhaps. To calculate the ideal split_out I think I will have to use len(ddf) at some point to know how much data I am currently dealing with. Is this “safe” from a memory perspective, or does Dask compute the entire DataFrame to get the current length? In that case I’m guessing something like this would work:
length = sum([ddf.map_partitions(func=len)])

I’m glad you find the visualizations helpful!

I think you’re right, at some point you’ll have to figure out how much data you’re working with after the processing steps you mentioned. len() will compute immediately, so this could be slow for a large dataset. Another option you might find gives you more flexibility is size(), which is lazy. It’s a bit different from len(), as you can see here.