Memory filled up when compute dataframe-mean with 67 million rows

Hi,
I encounter this issue while analyzing multiple df, each has around 67 million rows. I can compute() or export to_csv any individual df. I am using a for loop to create 50 df and append all of them to a list (I know it is not the best option to use dask with for loop, I am still figuring it out). Then, I take the list of 50 df and concatenate into 1 df with 50 cols and take the mean of it. However, I cannot do the compute() on the final dataframe-mean even though it has the same size with each indiv df which compute() works perfectly. The script gets killed by the OS as it takes up 95% of its allocated memory

I am not sure what happened or if there is any workaround way for it?
Thank you!

import dask.dataframe as dd
import dask.array as da
Results = []

for i in range (50):
    fft_input = da.random.randint(low=-20,high=100,size=(67108864,)).rechunk(chunks=(67108864,))    #Create a 1D dask array with 67108864 elements
    data=dd.from_dask_array(fft_input)        # data.compute() works fine
    Results.append(data)   # Convert array to dataframe and store into a list
  
allResult = dd.multi.concat(Results, axis=1)        #The list contains 50 separate dataframe and concat as 1 dataframe with 50 columns
dfff = allResult.mean(axis=1)                       #Take the mean of the concat dataframe
print(dfff)
dfff.compute()                                      #This is when the script gets killed

Here is the result of the above code

Dask Series Structure:
npartitions=1
0           float64
67108863        ...
dtype: float64
Dask Name: dataframe-mean, 402 tasks

Process finished with exit code 137 (interrupted by signal 9: SIGKILL)


Environment:

Dask version: 2022.2.1
Python version: 3.8
Operating System: Linux Ubuntu 18.04
Memory:  39.2 Gb
OS type: 64 bit
Install method (conda, pip, source): pip

Hi @ethanne and welcome to discourse!

I think the issue here is with the for loop, and the large task graph that Dask must navigate to calculate the mean, rather than the dataset itself being too large to fit into memory. Rewriting the snippet and comparing the resulting task graphs, we can see the immediate benefit of removing the da.rechunk and dd.concat operations:

import dask.dataframe as dd
import dask.array as da
# new method
ddf  = dd.from_array(da.random.randint(low=-20, high=100, size=(67,5)))
# get number of partitions
print(ddf.npartitions)
result = ddf.mean(axis=1)
result.visualize()

# for loop method
Results = []
for i in range (5):
    fft_input = da.random.randint(
        low=-20, high=100, size=(67,)
    ).rechunk((67,))
    data=dd.from_dask_array(fft_input)
    Results.append(data)
  
allResult = dd.concat(Results, axis=1)
# get number of partitions
print(allResult.npartitions)
dfff = allResult.mean(axis=1)
dfff.visualize()

Additionally, result.visualize() (left) has only three steps versus dfff.visualize() (right) with five tasks running in parallel (for each iteration of the for loop) plus the two final steps:

We can scale this up and run it in one line, which ran locally on my laptop in ~3 seconds:

import dask.dataframe as dd
import dask.array as da

result = dd.from_array(
    da.random.randint(low=-20, high=100, size=(67_000_000, 50))
).mean(axis=1).compute()
1 Like