Odd refcount issue

Hi,

I’m new to dask. I’m having some issues with the objects returned from the client.submit calls overwhelming the local memory. Therefore I’m trying to free up the memory as my job is proceeding. However it is clear to me that I don’t understand what is happening with the Future object refcount. Here are 2 example codes:

  1. using standard python
  2. using dask
import gc
import sys

A = [object()]
print('point0 refcount=', sys.getrefcount(A), id(A))
id_val = -1
for obj in gc.get_objects():
    if id(obj) == id(A):
        id_val = id(obj)
        print('in gc0',  sys.getrefcount(obj), id_val)
assert id_val >= 0
del(A)
gc.collect()
for obj in gc.get_objects():
    if id(obj) == id_val:
        assert 0

print('Done 0')
def foo():
    A = [object()]
    print('point1 refcount=', sys.getrefcount(A), id(A))
    for obj in gc.get_objects():
        if id(obj) == id(A):
            id_val = id(obj)
            print('in gc1', sys.getrefcount(obj), id_val)
    del(A)
    gc.collect()
    for obj in gc.get_objects():
        if id(obj) == id_val:
            assert 0

    print('Done foo')

foo()

A = [object()]
B = A
print('point2 refcount=', sys.getrefcount(A), id(A))
id_val = -1
for obj in gc.get_objects():
    if id(obj) == id(A):
        id_val = id(obj)
        print('in gc2', id_val, id(A), sys.getrefcount(obj))
assert id_val >= 0
del(A)
gc.collect()
for obj in gc.get_objects():
    if id(obj) == id_val:
        assert 0, 'success'

print('Done 2')

This code every time I run it, it trips on the last assert as it should.
The output looks like:

python scratch/test_ref.py 
point0 refcount= 2 140053461679552
in gc0 4 140053461679552
Done 0
point1 refcount= 2 140053461658752
in gc1 4 140053461658752
Done foo
point2 refcount= 3 140053461658752
in gc2 140053461658752 140053461658752 5
Traceback (most recent call last):
  File "scratch/test_ref.py", line 49, in <module>
    assert 0, 'success'
AssertionError: success

The dask code:

import gc
import sys
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

# start 10 workers each with 1 core and 4GB of memory
scheduler_options = {
    "dashboard_address": ":8099",
}
cluster = SLURMCluster(
    cores=1, memory='1GB',
    queue="blah", n_workers=1, scheduler_options=scheduler_options,
    log_directory='/tmp/.logs', local_directory='/tmp/.dask',
    walltime="0:10:00",  # if you dont specify this, the default is 30 mins, after which slurm will kill your workers
)

client = Client(cluster)
def square(x):
    return x ** 2

A = client.submit(square, 10)
print('point0 refcount=', sys.getrefcount(A), id(A))
id_val = -1
for obj in gc.get_objects():
    if id(obj) == id(A):
        id_val = id(obj)
        print('in gc0', sys.getrefcount(obj), id_val)
assert id_val >= 0
B = A.result()
print('point0_1 refcount=', sys.getrefcount(A), id(A))
client.cancel(A)
del(A)
gc.collect()
for obj in gc.get_objects():
    if id(obj) == id_val:
        assert 0
#client.shutdown()
print('Done 0')
def foo():
    client.restart() #= Client(cluster)
    A = client.submit(square, 10)
    print('point1 refcount=', sys.getrefcount(A), id(A))
    for obj in gc.get_objects():
        if id(obj) == id(A):
            id_val = id(obj)
            print('in gc1', sys.getrefcount(obj), id_val)
    B = A.result()
    client.cancel(A)
    del(A)
    gc.collect()
    for obj in gc.get_objects():
        if id(obj) == id_val:
            assert 0
    #client.shutdown()
    print('Done foo')

foo()

client.restart() #= Client(cluster)
A = client.submit(square, 10)
C = A
print('point2 refcount=', sys.getrefcount(A), id(A))
id_val = -1
for obj in gc.get_objects():
    if id(obj) == id(A):
        id_val = id(obj)
        print('in gc2', id_val, id(A), sys.getrefcount(obj))
assert id_val >= 0
B = A.result()
client.cancel(A)
del(A)
gc.collect()
for obj in gc.get_objects():
    if id(obj) == id_val:
        assert 0, 'success'

print('Done 2')

Different runs trip on either the first or second assert or they succeed. What am I doing wrong?

Typical output looks like:

point0 refcount= 2 140336482667872
in gc0 4 140336482667872
point0_1 refcount= 2 140336482667872
Done 0
point1 refcount= 2 140336931528176
in gc1 4 140336931528176
Traceback (most recent call last):
  File "scratch/test_ref_dask.py", line 56, in <module>
    foo()
  File "scratch/test_ref_dask.py", line 52, in foo
    assert 0
AssertionError


Hi @liberabaci,

Well, I have to admit I’m not an expert in Future handling internals. Maybe cc @crusaderky @fjetter.

Anyway, is the problem the Local memory, so on Client side? Then you don’t really care about the Future references, but you just need to delete objects you do not need anymore.

Correct, the issue is local memory. I just need to delete the objects but when I try to do so it looks like dask still has an open reference internally so the object cannot get gc collected.

cc @crusaderky @fjetter