Hello,
I’m a new Dask user.
I launched a job using Client and was surprised that 95 GB worth of temporary files were created before the execution crashed - that’s as much free space as I had on my hd.
- How would I estimate how much disk space is needed for the temporary files?
Here is my code
from functools import reduce
from distributed import Client
import dask.array as da
client = Client()
size = 100000
vals = da.linspace(0, 1, size).astype('float32')
test = da.random.uniform(low=0, high=1, size=(100000, 12, size), chunks=(200, 1, size)).astype('float16')
# number of total values is equal to the 12 above
var_lookup = {
'a': [0, 1],
'b':
[0, 1],
'c': [0],
'd': [0, 1],
'e': [0],
'f': [0, 1, 2],
'g': [0]
}
def custom_convolve(x,y):
temp_lst = []
for i in range(x.shape[0]):
a = da.fft.rfft(x[i])
b = da.fft.rfft(y[i])
conv_res = da.fft.irfft(a * b)
temp_lst.append(conv_res)
res = da.stack(temp_lst, axis=0)
return res
n_groups = len(var_lookup.keys())
counter = 0
group_cols = []
for i in var_lookup.keys():
grp = var_lookup[i]
if len(grp)==1:
temp = test[:,counter,:]
counter += 1
else:
test_list = []
for _ in var_lookup[i]:
test_list.append(test[:, counter, :])
counter += 1
temp = reduce(lambda x, y: da.map_blocks(custom_convolve, x, y, dtype='float32'), test_list)
res = (da.sum(temp[:, :] * vals**2, axis=1) - (da.sum(temp[:, :] * vals, axis=1)**2)).astype('float32')
group_cols.append(res)
final_res = da.stack(group_cols, axis=1)
final_res.compute()
-
Please explain what dask is storing in these temporary files. Are these the interim calculations of the workers (in my case convolution of vectors)?
-
How would I decide whether I should use client or not. I understand that if I don’t use client, then multi-threading still occurs, but I don’t understand which option is better in my context.
-
Without launching the client, is there also a way to look at a dashboard?