Why is distributed leaking memory from unused futures?

System information:

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04): macOS Monterey
  • Computer model: MacBook Pro (16-inch, 2019)
  • Memory: 16 GB 2667 MHz DDR4
  • Processor: 2.3 GHz 8-Core Intel Core i9
  • Python version: 3.10.4
  • Pandas version: 1.4.3
  • Dask version: 2022.7.1

I am entering the following script as a single command in ipython:

import pandas as pd
import numpy as np
from distributed import Client

client = Client()
df1_future = client.scatter(
    pd.DataFrame(np.random.randint(0, 100, (2**18, 2**8))), hash=False
)
print(f"first future: {df1_future}")

# Managed memory for 1 worker is now about 512 MiB

client.submit(lambda df: df + 1, df1_future)

# # Managed memory is now 1.00 GiB

In my dashboard at http://127.0.0.1:8787/status, I see the “managed memory” for one task start at 512 MiB, then go up to about 1 GiB after the submit. Every time I call client.submit(lambda df: df + 1, df1_future), another 500 MiB goes into the task’s managed memory. But I’m not saving the results of these submits anywhere, so distributed should delete the results of the submits. What’s going wrong?

I’m able to delete the first future with del df1_future, and that does free up 500 MiB. But I can’t find a way to delete the futures from the submit calls. I can see those futures’ keys by printing df.futures, but making a future with one of those keys with Future(key1) just makes a new future (though without copying). I can delete that future, but the underlying data doesn’t get deleted.

Note there is some kind of finicky behavior around whether the futures get deleted. I originally had print(client.futures) at the end of the script, but if I enter all of that in a single command, memory stays at about 500 MiB instead of going up to 1 GiB.

For context, Modin on Dask uses a very similar pattern of memory management. I noticed that Modin on Dask was leaking lots of memory, so I tried this small example in pure distributed.

P.S. I was unable to link to Modin because I was only allowed to put 2 links in my post. it’s modin-project/modin on github.

1 Like

Bumping this up for visibility. I’m seeing some similar behavior here.

This command returns a future, it’s just not stored anywhere:

client.submit(lambda df: df + 1, df1_future)

If you save and delete the future, then the memory should be freed up:

for _ in range(5):
    f = client.submit(lambda df: df + 1, df1_future)
    del f

@SultanOrazbayev saving and manually deleting the futures works for me. However, in general Python doesn’t require users to free unnamed objects. I think dask should take care of deleting futures that the client can never use. The memory management documentation linked above says:

The result of a task is kept in memory if either of the following conditions hold:

  1. A client holds a future pointing to this task. The data should stay in RAM so that the client can gather the data on demand.
  2. The task is necessary for ongoing computations that are working to produce the final results pointed to by futures. These tasks will be removed once no ongoing tasks require them.

But I would not expect the client to hold any future pointing to the unnamed and unused result of client.submit(lambda df: df + 1, df1_future). Why should the user be responsible for memory management of a future that they cannot (as far as I can tell) access?

Using temporary, unnamed objects this way is actually a very common pattern in an interactive context. e.g. in a notebook cell I might have:

import pandas as pd
import numpy as np

df = pd.DataFrame(np.random.randint(0, 100, size=(int(2**20), 2**8)))
print(df + 1)

Python constructs the dataframe from df1 + 1 and prints it, but then takes care of deleting the anonymous dataframe. Every time I repeat the print line, Python’s memory usage on my mac goes up by 2 GB, but then it goes down again immediately after.

It’s partly a jupyter thing, each command’s results are stored in the history, and you could in principle access it, https://jakevdp.github.io/PythonDataScienceHandbook/01.04-input-output-history.html.

@SultanOrazbayev I see, I think you’re right about the ipython kernel saving objects. I can see the objects that I thought were anonymous in Out. Furthermore, if I change my pandas snippet here to just df + 1 instead of printing it, my memory grows every time I do df + 1.

Still, following from my first code snippet, del Out[1] doesn’t free the memory and client.futures still lists both futures as available. I have the same problem with the pandas snippet. Presumably the kernel is holding on to the outputs somewhere else, too. Anyway, I think it’s safe to say this is a kernel problem and not a dask problem.

By the way, the solution from @martindurant here worked for both pandas and dask: if I just add a semicolon to the client.submit lines, the futures stay anonymous.

2 Likes

The names of those variables are _1, _2, etc. in jupyter/ipython. You also have normal python history variables _, __, ___ for the last three outputs. Short story: don’t output the variables at the end of a cell/line (e.g., use the semicolon trick).

2 Likes