Memory Leakage on single worker on merged DataFrame (after task completion)

Hi Dask community, some help needed here from a new Dask user,

I am trying to replicate itertools cartesian product like functionality at large scale using Dask DataFrame merge on two integer arrays. When I do this, it results in memory errors due to large amounts of unmanaged memory, regardless of the number of partitions I choose for the final Dask array, or any reduction operations I compute on the large merged DataFrame.

The memory errors always occur at the end of the computation, after the final task has completed, and the unmanaged memory is always on a single worker.

This occurs even if I convert to Dask array, store to parquet, or calculate a mean of the dataframe as the final output.

A similar memory error and large usage of unmanaged memory also occurs if I compute the length of the array when converting to Dask array, or compute chunk sizes.

I’ve been using Dask arrays in the past and have never experienced this issue with arrays, just DataFrames.

The code I’m using is as follows:

from dask_saturn import SaturnCluster
from dask.distributed import Client
from dask.distributed import Client, LocalCluster
client = Client(LocalCluster(n_workers=1,
                       threads_per_worker=16,
                       memory_limit='110GB'))

import dask.array as da
import dask.dataframe as dd
import numpy as np

n = 100000

inputarray = da.random.randint(0,10,size= (n, 5))

it1_temp = dd.from_array(inputarray).repartition(partition_size=100000)
it1_temp["key"] = 1

it2_temp = dd.from_array(inputarray).repartition(partition_size=100000)
it2_temp["key"] = 1

result = dd.merge(it1_temp, it2_temp, on="key", npartitions = 500)

reshead = result.mean()

reshead.compute()

The error message is:

2022-11-01 18:38:13,627 - distributed.worker_memory - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 75.43 GiB -- Worker memory limit: 102.45 GiB

Does anyone know why this might be happening after the tasks have finished? Or how I can avoid this memory leakage?

Thanks!