Sticking strictly to N workers and release resources

I’m trying to run a dask.delayed function which is highly computationally- and memory-intense and therefore prone to OOM crashed and over-utilization errors if not scheduled properly.

I’m creating dask.delayed functions and gathering them non-asynchronously to avoid over-utilization:

import dask
import pandas as pd

@dask.delayed
def intense_func(arg1): # very large inputs (objects containing datasets)

    # do some intense work

    return pd.DataFrame(...) # very small output (only one row)

runs=10
# 8-core machine
num_workers=2
threads_per_worker=4

cluster = LocalCluster(
    n_workers=num_workers,
    threads_per_worker=threads_per_worker,
    resources = {'foo':1},
    processes=False,
)
client = Client(cluster)

task_graph = []
for run in range(runs):
    # generate dataset based on random parameters
    irand1,irandn,jrand1,jrandn,krand1,krandn = np.random.randn(...)
    subvolume = volume[irand1:irandn,jrand1:jrandn,krand1:krandn]
    
    task_graph.append(intense_func(subvolume))
    
df = dask.compute(*task_graph, resources={'foo':1}) # 'foo' doesn't seem right, but it doesn't nicely without it ¯\_(ツ)_/¯
df = pd.concat(df).reset_index(drop=True)

Currently, the program is successfully scheduling the functions to run synchronously (i.e. No worker is running more than one task at a time). However, these are being spawned on different processes, causing memory to accumulate and crash the machines with an OOM error. (Related note: I get errors like this: distributed.utils_perf - WARNING - full garbage collections took 17% CPU time recently (threshold: 10%) before failure).

I currently have a task graph that has tasks being spawned on different processes, but successfully not executing more than two processes at a time. This only solves the issue of over-utilization, but I actually want something like this:

image

Which would probably also solve the OOM errors. Is it possible to only allow Dask to spawn two workers, and only allow these two workers to do the work, without creating new processes beyond these two workers?

Thanks in advance :slight_smile:

(Originally posted here: python - Dask: How to submit jobs to only two processes in a LocalCluster? - Stack Overflow)

Hi @TheGitPuller, welcome to Dask Discourse!

There might be some misunderstanding here: a Worker is a Python process. Tasks are always running inside this Python process.

So each Worker is executing one task at a time. I don’t understand the scheme, is this a Dashboard representation? If so, isn’t that what you are already describing?

As for the first remark, I’m not sure I understand: considering your code, you only spawn two workers, and these two workers are doing the work, they should not be creating other processes.

One other remark: in

You are generating input data on client side, and sending it to the Worker when calling the delayed function. Could you generate or read the data directly in the delayed function? Or is the data small enough?

And finaly, if you could build a minimal repdoducer with fake data, this would really help to understand your problem!

Hi @guillaumeeb, thanks for your response. As a new user, I’m capped in the number of images I can post (one). Following the StackOverflow link should make it clearer (what I’m seeing vs. what I want to see).

This is what I currently see:

Whereas I’d like a task stream that looks similar to above, i.e., a task graph where each worker executes one process as a time, but the task doesn’t create a new Python process on each new run but cleans up its memory as soon as it’s finished.

You are generating input data on client side, and sending it to the Worker when calling the delayed function. Could you generate or read the data directly in the delayed function? Or is the data small enough?

It’s small enough to be passed at each process, I don’t want to scatter it (there are other things going on as well that I’m simplifying over).

@guillaumeeb sorry for the delay in getting you a minimal reproducer. Here’s an example which demonstrates what I’m seeing:

import dask
import time
import copy
import numpy as np
import pandas as pd
from dask.distributed import Client, LocalCluster

# 96-core machine
num_workers=4
threads_per_worker=24

cluster = LocalCluster(
    n_workers=num_workers,
    threads_per_worker=threads_per_worker,
    resources = {'foo':1},
    processes=False,
)

client = Client(cluster)

@dask.delayed
def intense_func(volume, nrepeats):
    
    reference = copy.deepcopy(volume)
    
    start = time.time()
    
    for repeat in range(nrepeats):
        volume = np.fft.fftn(volume)
        volume = np.fft.ifftn(volume)
        
    end = time.time()
    
    same = np.allclose(reference, volume)
    
    return_dict = {'returns': same, 'time': end-start}
    
    return pd.DataFrame(return_dict, index=[0])

nx = ny = nz = 512
volume = np.random.uniform(low=0.,high=1.,size=(nx,ny,nz))

task_graph = []
ntransforms = 20
tasks = 20
for task in range(tasks):
    
    # generate dataset based on random parameters
    istart,iend = np.random.randint(0,nx//2), np.random.randint(nx//2,nx)
    jstart,jend = np.random.randint(0,ny//2), np.random.randint(ny//2,ny)
    kstart,kend = np.random.randint(0,nz//2), np.random.randint(nz//2,nz)
    
    subvolume = volume[istart:iend,jstart:jend,kstart:kend]
    
    task_graph.append(intense_func(subvolume, ntransforms))
    
df = dask.compute(*task_graph, resources={'foo':1}) # 'foo' doesn't seem right, but it doesn't schedule properly without it ¯\_(ツ)_/¯
df = pd.concat(df).reset_index(drop=True)

This function does repeated FFT/IFFT pairs on random subsets of some data volume, and then checks to make sure the output is the same as the original signal. This is the simplest analogue I could think of that mimics the computational intensity of what I’m doing.

Although Dask successfully only schedules 2 processes to run at any one time, each process is being spawned with a new task, instead of recycling the the same task. This means that, by the end of the 20 submissions, 20 jobs are being held in memory and only released at the end. So what I’m seeing is a task stream like this:

When ideally I’d like to see a task stream like that in the original post, where two tasks are executing a process at a time, and releasing their resources once complete to prevent memory from banking up and giving me an OOM error.

I think there is a misunderstanding in how Dask behave and distribute tasks.

A worker is compose of one process, and several threads. A given task is executed on an available thread of any Worker. Dask never creates new processes or new threads after Workers are created.

With your setup, there is at most one task per Worker that can run at a given time, but this task can be ran by any of the 24 threads of each Worker. This is what you are seeing in the tasks graph: each line represents a different worker thread, not a new process.

So the OOM you are seing are not related to this. There is probably something wrong in your intense_func that doesn’t release the memory it used after execution.