Memory Leakage on single worker on merged DataFrame (after task completion)

Hi Dask community, some help needed here from a new Dask user,

I am trying to replicate itertools cartesian product like functionality at large scale using Dask DataFrame merge on two integer arrays. When I do this, it results in memory errors due to large amounts of unmanaged memory, regardless of the number of partitions I choose for the final Dask array, or any reduction operations I compute on the large merged DataFrame.

The memory errors always occur at the end of the computation, after the final task has completed, and the unmanaged memory is always on a single worker.

This occurs even if I convert to Dask array, store to parquet, or calculate a mean of the dataframe as the final output.

A similar memory error and large usage of unmanaged memory also occurs if I compute the length of the array when converting to Dask array, or compute chunk sizes.

I’ve been using Dask arrays in the past and have never experienced this issue with arrays, just DataFrames.

The code I’m using is as follows:

from dask_saturn import SaturnCluster
from dask.distributed import Client
from dask.distributed import Client, LocalCluster
client = Client(LocalCluster(n_workers=1,
                       threads_per_worker=16,
                       memory_limit='110GB'))

import dask.array as da
import dask.dataframe as dd
import numpy as np

n = 100000

inputarray = da.random.randint(0,10,size= (n, 5))

it1_temp = dd.from_array(inputarray).repartition(partition_size=100000)
it1_temp["key"] = 1

it2_temp = dd.from_array(inputarray).repartition(partition_size=100000)
it2_temp["key"] = 1

result = dd.merge(it1_temp, it2_temp, on="key", npartitions = 500)

reshead = result.mean()

reshead.compute()

The error message is:

2022-11-01 18:38:13,627 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 75.43 GiB -- Worker memory limit: 102.45 GiB

Does anyone know why this might be happening after the tasks have finished? Or how I can avoid this memory leakage?

Thanks!

1 Like

Hi, I am facing the same issue. Did you managed to solve this?

Hi @goodmanngl, @gameliee, welcome to Dask Discourse!

Sorry @goodmanngl that your post has been unanswered for so long…

The problem with the code above is that dask uses map/reduce like code for merges, and put in a single partition all the rows of the Dataframe with the same key. You only have one key, because you want to use this to perform a cartesian product. But the entire product is then put in a single partition, on a single worker. 75GiB.

I guess you need to find another more parallelized way to perform this product.

Thank you,

By saying “entire products is then put in a single parition”, do you mean that in the line result = dd.merge(it1_temp, it2_temp, on="key", npartitions = 500), the results is actually in a single partition, that npartitions=500 has no meaning?

Thank again

As stated in the documentation:

The ideal number of output partitions.

Here Dask has only one key, so it cannot create as many partitions.

Thank you for the clarify.

Since my data is bigger for one key, I guess Dask would not do the job for me. With your experience, could you suggest me something else to try?