Estimating disk space needed for temporary files

Hello,

I’m a new Dask user.
I launched a job using Client and was surprised that 95 GB worth of temporary files were created before the execution crashed - that’s as much free space as I had on my hd.

  1. How would I estimate how much disk space is needed for the temporary files?

Here is my code

from functools import reduce

from distributed import Client
import dask.array as da

client = Client()

size = 100000

vals = da.linspace(0, 1, size).astype('float32')

test = da.random.uniform(low=0, high=1, size=(100000, 12, size), chunks=(200, 1, size)).astype('float16')

# number of total values is equal to the 12 above
var_lookup = {
        'a': [0, 1],
        'b':
        [0, 1],
        'c': [0],
        'd': [0, 1],
        'e': [0],
        'f': [0, 1, 2],
        'g': [0]
    }

def custom_convolve(x,y):
    temp_lst = []
    for i in range(x.shape[0]):
        a = da.fft.rfft(x[i])
        b = da.fft.rfft(y[i])
        conv_res = da.fft.irfft(a * b)
        temp_lst.append(conv_res)
    res = da.stack(temp_lst, axis=0)
    return res

n_groups = len(var_lookup.keys())

counter = 0
group_cols = []
for i in var_lookup.keys():

    grp = var_lookup[i]
    if len(grp)==1:
        temp =  test[:,counter,:]
        counter += 1
    else:
        test_list = []
        for _ in var_lookup[i]:
            test_list.append(test[:, counter, :])
            counter += 1
        temp = reduce(lambda x, y: da.map_blocks(custom_convolve, x, y, dtype='float32'), test_list)

    res = (da.sum(temp[:, :] * vals**2, axis=1) - (da.sum(temp[:, :] * vals, axis=1)**2)).astype('float32')

    group_cols.append(res)

final_res = da.stack(group_cols, axis=1)

final_res.compute()
  1. Please explain what dask is storing in these temporary files. Are these the interim calculations of the workers (in my case convolution of vectors)?

  2. How would I decide whether I should use client or not. I understand that if I don’t use client, then multi-threading still occurs, but I don’t understand which option is better in my context.

  3. Without launching the client, is there also a way to look at a dashboard?

I tried to play with your code to understand what was happening, but it’s a bit too complicated for me. Can you reproduce this kind of behavior with some part of this code?

First, there is the same misunderstanding as in TypeError on da.argmax when executing compute.

Dask tries to stream computation as much as it can, which means avoiding storing intermediate results if it doesn"t need to. In this case, it seems that somewhere within the computation, it needs to store some intermediate chunks results to be able to perform some aggregation at some point. So it spills data to disk until it can perform this aggregation. I didn’t find where while looking at your code for a few minutes, the result is not big, so I’m not sure.

Well, that depends of the kind of computation your doing (threading vs multi-processing), the IO (multiprocessing will be better at some point), and if you want to scale to several server (then you’ll need to use distributed Dask and thus a Client).

Unfortunately not.