Scheduler out of memory error when sharing datasets

Hi,
I am trying to create a dask array from datasets shared from different clients by using the published datasets methods and then passing the names of the published dataset to a subscriber client that creates the dask.array using the dask.array.block(). The client runs in a loop, so multiple dask.arrays are created and eventually the scheduler processes crashes due to an OOM error.
I try to remove the used published datasets after I use them to build the dask.array, however the scheduler still does not release something that is being created.

A minimal reproducer is the following:

The subscriber process is this (using a distributed scheduler):

def current_memory():
    import sys, os
    import psutil
    process = psutil.Process(os.getpid())
    mem = process.memory_info().rss / (1024 * 1024)  # MiB
    return mem
    
def runmain():
    import numpy as np 
    from random import randrange
    from dask.distributed import Client, Sub
    import dask.array as da
    import gc
   
    scheduler_file='dask_file.json'
    client = Client(scheduler_file=scheduler_file) 

    count = 0
    datasets_to_delete_old = []

    # Sub will block until a result is published from a simulation client
    sub = Sub(name='mydata', client=client)
    mydata = sub.get()  
    print('Mem: returned from: sub.get() ', flush=True)
    data_str = mydata['dask_string']
    datasets_to_delete = mydata['client_datasets']

    while (mydata['continue'] == True):
        print(f' Subscribe count', str(count), flush=True )
        
        # Remove datasets used in previous iteration
        for ds in datasets_to_delete_old:
            del client.datasets[ds]
            client.run_on_scheduler(gc.collect)
            
        exec( "global data; " + data_str )
        not_fut_da = da.block( data )
        
        if count == 0:
            mean  = da.zeros_like(not_fut_da, dtype=np.float64)
            print('STIME: type(mean)                             : ', type(mean), flush=True )
            print('STIME: type(mean).shape                  : ', mean.shape, flush=True ) 
            print('STIME: type(mean).blocks.ravel()      : ', mean.blocks.ravel(), flush=True )
            mean  = mean.persist()
            
        count += 1
        delta = not_fut_da - mean
        mean += delta / count
        mean = mean.persist()
        
        del delta
        del not_fut_da
        print('Scheduler memory: ', client.run_on_scheduler(current_memory), flush=True)
        
        # this will block until a result is published from a simulation client
        mydata = sub.get()  
        if (mydata['continue'] == True): 
            data_str = mydata['dask_string']
            datasets_to_delete_old = datasets_to_delete[:]
            datasets_to_delete = mydata['client_datasets']
       
    print('da.average(mean)).compute()               : ', (da.average(mean)).compute())
    print(f' Exiting Simulation', flush=True )
    time.sleep(10)
    
if __name__ == '__main__':
    runmain()

Publisher code - runs a loop that publishes datasets

def runmain():
    import numpy as np 
    from dask.distributed import Client, Pub
    import dask.array as da
    import time
    
    numpyLength = 19999850
    
    scheduler_file='dask_file.json'
    
    magicnum_str = str(np.random.randint(1000000, size=1)[0])
    count = 0

    for iteration in range (0,20):
        
        print('Running Publish iteration: ' , str(iteration), flush=True )
        client = Client(scheduler_file=scheduler_file)
        
        mydatadict_scatterd = []

        # Create a numpy array
        ND_a = np.random.rand(numpyLength)
        
        # Set the keys to reference the data in client.datasets[]
        md_key_a = magicnum_str+ 'a_' + str(iteration) 
        
        client.datasets[md_key_a] = ND_a
        
        # Set up a string to use to construct a dask.Array using da.block()
        data_str = 'data = [client.datasets[\\'' + md_key_a +'\\']]' 
        
        # Pass to the subscriber:  
        # - The da.block() string 
        # - The keys of published datasets so that they can be deleted
        pub = Pub(name='mydata')
        to_stats_server = dict({'iteration':iteration, 'dask_string':data_str, 'client_datasets':mydatadict_scatterd, 'continue':True})
        pub.put(to_stats_server) 
        
        
        time.sleep(1.0)
        client.close()
        mydatadict_scatterd.clear()
    
    client = Client(scheduler_file=scheduler_file)
    to_stats_server = dict({'continue':False})
    pub.put(to_stats_server) 
    time.sleep(10)
    
    print(f' Exiting Publish datasets', flush=True )
    
if __name__ == '__main__':
    runmain()

Some interesting points

On the publisher side:

  • If I create and publish a dask.array (instead of using a numpy array) using the da.random method the is no OOM issue.
    ND_a_arr = da.random.random_sample(numpyLength_a, chunks=(numpyLength))
    ND_a_arr = client.persist(ND_a_arr )
    client.datasets[md_key_a] = ND_a_cpy_arr
  • If I create the dask.array using da.from_array() using a numpy array then the OOM issue remains.
    ND_a = np.random.rand(numpyLength_a)
    ND_a_arr = da.from_array(ND_a, chunks=(numpyLength), name=False)
    ND_a_arr = client.persist(ND_a_arr)
    client.datasets[md_key_a] = ND_a_arr

Using data made available in Numpy arrays is my use case (2nd dot point above). Any ideas on how to ensure memory stays at a sensible level when sharing data via Numpy arrays would be appreciated.

System details:

dask, version 2024.2.0
Python 3.10.4
CentOS Linux release 7.9.2009 (Core)

I have found a solution

Creating a dask.array using da.from_delayed() is preventing the OOM error.

    ND_a = np.random.rand(numpyLength)
    future_a = client.scatter(ND_a)
    ND_a_arr = da.from_delayed(future_a, (numpyLength,), dtype=float)
    ND_a_arr = client.persist(ND_a_arr)
    client.datasets[md_key_a] = ND_a_arr

The above method seems to work without the need to call client.run_on_scheduler(gc.collect) on the scheduler process.

Question

Why does the da.from_array() version cause OOM issues?

The tracemalloc tool leads me to :

.local/lib/python3.10/site-packages/distributed/protocol/serialize.py:97: size=1831 MiB (+305 MiB), count=36 (+6), average=50.9 MiB

However, I cannot get a stack trace of what is calling this, so have not dug further.

The scheduler stores the graphs of the objects you send to the cluster; in the case of published ones, these will persist after the creating client is done with them (that’s the point). When the array is made from in-memory numpy data, the graph contains that data. This is as opposed to da.arrays created via some load-from-disk, create-random, zeros, ones, range, etc, which only need send the function to be called.

Aside from changing your workflow to avoid sending numpy data to the cluster (preferred), you might be able to persist (or scatter) your data to the cluster first, so that the scheduler only needs to know the data’s unique key rather than how to make it.

1 Like

Thanks for the reply and description of the differences of how the graphs can be produced. That clarifies why I have been having issues and why the da.from_dealyed() version works without excessive memory issues. I was getting warnings about large graphs being sent to the scheduler, which have now gone.

So, in my initial version of my code, the Numpy data is part of the graph, when I used the data on the subscriber process, I call mean = mean.persist(), which I thought computes up to the current stage of the graph and therefore references to the shared Numpy data could be deleted and garbage collected. Perhaps, again, I have an incorrect vision of how Dask graphs are processed. Is there a way to remove the shared data after it is used?

You probably want client.persist() instead, and the thing you publish is the result of this call.

ah, this is where the API of Dask starts to be too much for me. There are client based persist methods too, which I have used, but have now removed, and I am not sure how they differ. Coding by combinatorial exploration is not a time efficient way forward for me!

It looks like I need one more thing to ensure stability - a wait() call after the persist:

    ND_a = np.random.rand(numpyLength)
    future_a = client.scatter(ND_a)
    ND_a_arr = da.from_delayed(future_a, (numpyLength,), dtype=float)
    ND_a_arr = client.persist(ND_a_arr)
    wait(ND_a_arr) # add this - seems to stop a race condition
    client.datasets[md_key_a] = ND_a_arr

Further testing has lead me to the following recipe:

    ND_a = np.random.rand(numpyLength)
    future_a = client.scatter(ND_a)
    wait(future_a)
    ND_a_arr = da.from_delayed(future_a, (numpyLength,), dtype=float)
    wait(ND_a_arr )
    ND_a_arr = client.persist(ND_a_arr)
    # wait(ND_a_arr) # this seemed to have no effect
    client.datasets[md_key_a] = ND_a_arr

1 Like