Dask computation takes way too much memory

Hello! I wanted to ask because I’m probably missing some optimization.

I do the following:

# Read in the data.
df = dd.read_parquet(... calculate_divisions=True, index='Time')
# I need to groupby('ProductCode').resample('1s') from sub-1second data within each group
# of Product.  It's been OOMing a lot so if I write it like this then it's not.
resampled = dd.from_pandas(pd.DataFrame(), npartitions=df.npartitions)
for key in df['ProductCode'].unique():
    per_product = df[df['ProductCode'] == key].reset_index().set_index(['Time'], sort=True).resample('1s').agg({
        'ProductCode': 'first',
        'a': 'first',
        'b': 'max',
        'c': 'min',
        'd': 'last',
        'e': 'sum',
        'f': 'sum',
        'g': 'max',
        'h': 'sum',
        'i': 'sum'}).dropna()
    distributed.print('Resampled {}'.format(key))
    resampled = dd.multi.concat([resampled, per_product])
# Now write out the results
resampled.to_parquet(..., overwrite=True)

My source data takes 20ish Gb in parquet (2172 files * 8mb each) on disk. My machine has 12 cores and 128 Gbs RAM.

I’ve been running this as distributed.Client(distributed.LocalCluster(memory_limit='20GB', n_workers=5)) and it crashes in the writing out part with “kernel is restarting”, which I take means out of memory. Same with memory_limit='20GB', n_workers=5, threads_per_worker=2. I now downgraded it to memory_limit='50GB', n_workers=2, threads_per_worker=1 and it is slowly writing out the results in the last step while taking 80gb RAM.

I added reset_index().set_index before resample because otherwise it was getting stuck on resample. I now suspect that my graph is big and maybe it sometimes takes time (like 20mins+) to start the computation.

Is there a better strategy to accomplish this than to reduce the amount of workers to be able to give them more RAM? Should I be persist()-ing stuff?

Finally, does the last to_parquet step takes so much memory because it is redoing the whole computation starting with resample or even read_parquet?

This is the status page during to_parquet with two workers.

The two workers didn’t make much progress after a while.
I got it done by writing out to_parquet for each grouped-by resampled group. Then I read many parquets of individual products with delayed+pd.read_parquet and wrote them all in one dd.to_parquet(dd.from_delayed(...)).

However, this feels unsatisfactory. It seems my strategy for scaling when faced with OOMs is to: reduce the number of workers to allow each more RAM and/or introduce intermediate read/writes.

Is there a way to do better? Thanks.

Hi @igr, welcome to Dask community!

It’s hard to debug without having access to the data or a reproducer, but I’ll try to highlights some important points.

First, what was you code using groupby? What was the error you encountered? This should definitly be the approach here. Maybe using groupby().apply().

If it’s your kernel, then it’s not on Worker part, but the main Python process which is using too much memory. I’m not sure what in you code causes memory to grow on main process side.

This is an expensive first computation, involving shuffle on all the dataframe to find the unique keys.

This is also quite an expensive computation. 4 million tasks in a graph is quite a lot!
According to your screenshot and your code, it seems you have almost 3 millions of different ProductCode?

It depends on your data, but it should not use so much memory. Persisting won’t optimize that. Persisting the input dataframe might help in the following step to achieve more speed, but it will take more memory.

The final step triggers the whole computation, it is not redoing it but actually doing it.

Yes, not quite satsfactory. In this case you would better use only Pandas for the first part of computing each ressampling.

1 Like

Thanks for taking a look.

It’s actually just 53 according to df['ProductCode'].shape[0].compute().

I rewrote the for-loop as:

df = dd.read_parquet(..., calculate_divisions=True, index='Time')
merged = df.groupby(['ProductCode']).apply(lambda group: group.resample('1s').agg({
        'a': 'first',
        'b': 'max',
        'c': 'min',
        'd': 'last',
        'e': 'sum',
        'f': 'sum',
        'g': 'max',
        'h': 'sum',
        'i': 'sum',
    }).dropna())
merged.to_parquet(..., overwrite=True)

It’s not getting stuck as my previous attempts, however it fails with

KilledWorker: Attempted to run task ('shuffle_p2p-4e91bef288d199c948c38110a3239403', 2114) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:41463. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

which I think in the past correlated with out of memory, but maybe not - not easy to tell. I tried even with distributed.LocalCluster(memory_limit='33GB', n_workers=3, threads_per_worker=1). Here is its output: 2023-12-20 11:47:01,095 - distributed.worker.memory - WARNING - Unmanaged memory - Pastebin.com.

Actually in the initial version of the code with the explicit for loop - there was an expensive dask computation with every for loop iteration. Interestingly the new version also does something similar under the hood - according the task page it goes through 8k tasks (produces a partial final output) and then goes through new 8k tasks again, as if the graph is re-ran multiple times.

Is there a way to make the new version succeed? It would be awesome to be able to fully rely in Dask/Pandas API and not explicit for loops.

Huh, it’s finishing successfully with (memory_limit='50GB', n_workers=2, threads_per_worker=1). Yet, this is not how we are supposed to scale, right? If memory_limit is the issue then we are supposed to break the work down into smaller pieces and/or spill intermediate results onto disk. Perhaps there’s a way to re-write this to make it work with less memory->more workers? Thanks!

Groupby involves shuffle, and partitions may have to be clearly identified, depending on how your data is stored and the operation you do, sometimes two shuffle phases are required.

According to your log, it is. I suspect that you have a big amount of record for a given ProductCode, which creates a huge single grouping partition, causing OutOfMemory when gathering all the records on a single worker. groubpy().apply() will need all the record of a given partition to fit into memory.

I think the problem here is that your need all the data for a given ProductCode at once in order to be able to perform the resampling. If you only have the aggregation phase, you could write some custom Dask aggregation and it would use much less memory.