Hi,
I am trying to create a dask array from datasets shared from different clients by using the published datasets methods and then passing the names of the published dataset to a subscriber client that creates the dask.array using the dask.array.block(). The client runs in a loop, so multiple dask.arrays are created and eventually the scheduler processes crashes due to an OOM error.
I try to remove the used published datasets after I use them to build the dask.array, however the scheduler still does not release something that is being created.
A minimal reproducer is the following:
The subscriber process is this (using a distributed scheduler):
def current_memory():
import sys, os
import psutil
process = psutil.Process(os.getpid())
mem = process.memory_info().rss / (1024 * 1024) # MiB
return mem
def runmain():
import numpy as np
from random import randrange
from dask.distributed import Client, Sub
import dask.array as da
import gc
scheduler_file='dask_file.json'
client = Client(scheduler_file=scheduler_file)
count = 0
datasets_to_delete_old = []
# Sub will block until a result is published from a simulation client
sub = Sub(name='mydata', client=client)
mydata = sub.get()
print('Mem: returned from: sub.get() ', flush=True)
data_str = mydata['dask_string']
datasets_to_delete = mydata['client_datasets']
while (mydata['continue'] == True):
print(f' Subscribe count', str(count), flush=True )
# Remove datasets used in previous iteration
for ds in datasets_to_delete_old:
del client.datasets[ds]
client.run_on_scheduler(gc.collect)
exec( "global data; " + data_str )
not_fut_da = da.block( data )
if count == 0:
mean = da.zeros_like(not_fut_da, dtype=np.float64)
print('STIME: type(mean) : ', type(mean), flush=True )
print('STIME: type(mean).shape : ', mean.shape, flush=True )
print('STIME: type(mean).blocks.ravel() : ', mean.blocks.ravel(), flush=True )
mean = mean.persist()
count += 1
delta = not_fut_da - mean
mean += delta / count
mean = mean.persist()
del delta
del not_fut_da
print('Scheduler memory: ', client.run_on_scheduler(current_memory), flush=True)
# this will block until a result is published from a simulation client
mydata = sub.get()
if (mydata['continue'] == True):
data_str = mydata['dask_string']
datasets_to_delete_old = datasets_to_delete[:]
datasets_to_delete = mydata['client_datasets']
print('da.average(mean)).compute() : ', (da.average(mean)).compute())
print(f' Exiting Simulation', flush=True )
time.sleep(10)
if __name__ == '__main__':
runmain()
Publisher code - runs a loop that publishes datasets
def runmain():
import numpy as np
from dask.distributed import Client, Pub
import dask.array as da
import time
numpyLength = 19999850
scheduler_file='dask_file.json'
magicnum_str = str(np.random.randint(1000000, size=1)[0])
count = 0
for iteration in range (0,20):
print('Running Publish iteration: ' , str(iteration), flush=True )
client = Client(scheduler_file=scheduler_file)
mydatadict_scatterd = []
# Create a numpy array
ND_a = np.random.rand(numpyLength)
# Set the keys to reference the data in client.datasets[]
md_key_a = magicnum_str+ 'a_' + str(iteration)
client.datasets[md_key_a] = ND_a
# Set up a string to use to construct a dask.Array using da.block()
data_str = 'data = [client.datasets[\\'' + md_key_a +'\\']]'
# Pass to the subscriber:
# - The da.block() string
# - The keys of published datasets so that they can be deleted
pub = Pub(name='mydata')
to_stats_server = dict({'iteration':iteration, 'dask_string':data_str, 'client_datasets':mydatadict_scatterd, 'continue':True})
pub.put(to_stats_server)
time.sleep(1.0)
client.close()
mydatadict_scatterd.clear()
client = Client(scheduler_file=scheduler_file)
to_stats_server = dict({'continue':False})
pub.put(to_stats_server)
time.sleep(10)
print(f' Exiting Publish datasets', flush=True )
if __name__ == '__main__':
runmain()
Some interesting points
On the publisher side:
- If I create and publish a dask.array (instead of using a numpy array) using the
da.random
method the is no OOM issue.
ND_a_arr = da.random.random_sample(numpyLength_a, chunks=(numpyLength))
ND_a_arr = client.persist(ND_a_arr )
client.datasets[md_key_a] = ND_a_cpy_arr
- If I create the dask.array using
da.from_array()
using a numpy array then the OOM issue remains.
ND_a = np.random.rand(numpyLength_a)
ND_a_arr = da.from_array(ND_a, chunks=(numpyLength), name=False)
ND_a_arr = client.persist(ND_a_arr)
client.datasets[md_key_a] = ND_a_arr
Using data made available in Numpy arrays is my use case (2nd dot point above). Any ideas on how to ensure memory stays at a sensible level when sharing data via Numpy arrays would be appreciated.
System details:
dask, version 2024.2.0
Python 3.10.4
CentOS Linux release 7.9.2009 (Core)
I have found a solution
Creating a dask.array using da.from_delayed()
is preventing the OOM error.
ND_a = np.random.rand(numpyLength)
future_a = client.scatter(ND_a)
ND_a_arr = da.from_delayed(future_a, (numpyLength,), dtype=float)
ND_a_arr = client.persist(ND_a_arr)
client.datasets[md_key_a] = ND_a_arr
The above method seems to work without the need to call client.run_on_scheduler(gc.collect) on the scheduler process.
Question
Why does the da.from_array()
version cause OOM issues?
The tracemalloc tool leads me to :
.local/lib/python3.10/site-packages/distributed/protocol/serialize.py:97: size=1831 MiB (+305 MiB), count=36 (+6), average=50.9 MiB
However, I cannot get a stack trace of what is calling this, so have not dug further.