So at first I thought that the problem was that the there was a worker timeout. However, it took a while for the full error to materialize and by the end it’s clear it’s an out-of-memory error:
I’m surprised by the fact that it’s a 0-dim vector, which leads me to think that there’s a shape problem. I also see it’s u-int8, no idea where that’s coming from since I create dask arrays in my code and it’s either float32 or float16 everywhere.
What would be a way to debug what’s going on? Hoping to identify the part of the code that’s causing this.
@velosipednikov The futures cancelled error will often show when one of your tasks has crashed remotely, eventually being cancelled. I would monitor your memory usage while executing your graph and see if any of your workers are getting close to the memory limit. If so, check the scheduler logs and see if workers are being removed. If they are, your task is taking up too much memory and you need to slim it down or increase your worker size.
@secrettoad I’ve tried that specific memory setting on another computationally-intensive operation on my machine and it worked. The problem is that I can’t bring the dashboard up - it never loads, so I don’t know what’s going on with the memory usage.
How would I examine the scheduler logs you’re referring to?
I realized that there was an inefficiency in my original processing logic, which I update below. Nevertheless, even with this update (following which I at least can now see the dashboard) I’m beginning to wonder whether either:
a) I’m programming this egregiously wrong from a dask optimization perspective
b) Dask does not speed up the task at hand and I should abandon it
The reason for these suspicions is that the process as I have it below on only 10K timestamps has failed after running for 128 minutes (after all the workers died). In comparison, I have a process that just uses loops without any dask on 100K timestamps and the entire job (of which this particular piece of code takes up approximately half the time) takes 89 minutes. So, something is definitely not right.
The gist of the update is that I used to pass the entire test_array to create_array_chunk and then subset by i and j but that is unnecessary. I now subset test_array by i,j before passing that vector to create_array_chunk.
import dask.array as da
from dask import delayed, config
from distributed import Client
import numpy as np
client = Client(n_workers=4, threads_per_worker=1, memory_limit='3.8GB')
n_cols = 12
n_params = 11
n_rows = 10000
x_size = 100000
test_arr = da.random.uniform(low=0, high=1, size=(n_rows, n_cols, n_params), chunks=(1000, n_cols, n_params)).astype('float32')
x = da.linspace(0, 1, x_size, dtype='float32')
def create_array_chunk(param_vec, x):
# There will be more triggers of this condition in my actual use case
# compared to dummy numbers in test_arr
if param_vec[10] == 0:
ind = da.isclose(param_vec[0], x, atol=1e-5)
p_x = da.zeros_like(x, dtype='float32')
p_x[ind] = 1
else:
a = param_vec[0]
b = param_vec[1]
c = param_vec[2]
d = param_vec[3]
e = param_vec[4]
f = param_vec[5]
g = param_vec[6]
h = param_vec[7]
i = param_vec[8]
j = param_vec[9]
list_a = [a, b, c, d, e]
list_b = [f, g, h, i, j]
p_x = da.where((x >= a) & (x <= e),
np.interp(x, list_a, list_b), 0)
p_x = p_x / da.sum(p_x)
return p_x
i_arrays = []
for i in range(test_arr.shape[0]):
j_arrays = []
for j in range(test_arr.shape[1]):
darray = da.from_delayed(delayed(create_array_chunk)(test_arr[i, j,:], x),
dtype=np.float32,
shape=(x_size, ))
j_arrays.append(darray)
j_stack = da.stack(j_arrays, axis=0)
i_arrays.append(j_stack)
res = da.stack(i_arrays, axis=0).rechunk((1000,1,x_size))
res.compute()
For reference, the task stream has lots of whitespace and only 3 workers at the point I took the screenshot (looks like 1 worker has been completely disabled). Also not sure whether 240K ‘finalize’ operations is slowing things down quite a bit.
Dask adds overhead to be able to distribute tasks, so if you are trying to optimize for time, you probably want to use significantly more than 3/4workers. Otherwise you may indeed end up taking longer than without dask to complete the original task.
Also I would recommend putting your data into a dask array and from there using vectorized/more efficiently distributed functions as opposed to custom delayed functions. Custom logic on matrices/dataframes is almost always going to be slower than using prebuilt and pre-optimized functions maintained by the dask team.
I only have 4 cores, 2 threads each and 16 GB RAM on my machine, so I think I am limited to 8 workers? And if I specify 8 then I have to decrease the memory allocation accordingly meaning that I also have to decrease the chunksize.
Yes, I had already understood from these problems with delayed that I can use map_blocks instead. I need to unfortunately use a custom function, but at least I can deploy that function on a chunk and then loop through.
First, I don’t think looping around the indices of a Dask Array is a good use of it. Dask Arrays are lazy structure, and I’m not sure of the result of this pattern.
The Dashboard snapshot you provide shows that Dask is spilling a lot to disk which might explain why it is so slow. This spilling is expected, as your resulting array is 44.7 GiB, and Dask will need to build it entirely in worker memory before sending it back to the client upon calling compute(), which won’t work if you only have 16GB RAM on your laptop.
I’m not sure of what you are trying to achieve, and there are other questions I have:
How do you read your input data in the real workflow?
What is the real size of test_arr and x? From this example, I’m not sure why you are using Dask Arrays as inputs.
Ideally, you should work only with Numpy when working in chunks (e.g. in create_array_chunk function).
So yes, I think the correct approach is using map_blocks, but it depends also on how your input is created and the real workflow.
Based on what I understood, here is some code I put together, not sure if it makes sense:
import dask.array as da
from dask import delayed, config
from distributed import Client
import numpy as np
client = Client()
# Just use Numpy inside this function
def create_array_chunk_np(param_vec, x):
# There will be more triggers of this condition in my actual use case
# compared to dummy numbers in test_arr
if param_vec[10] == 0:
ind = np.isclose(param_vec[0], x, atol=1e-5)
p_x = np.zeros_like(x, dtype='float32')
p_x[ind] = 1
else:
a = param_vec[0]
b = param_vec[1]
c = param_vec[2]
d = param_vec[3]
e = param_vec[4]
f = param_vec[5]
g = param_vec[6]
h = param_vec[7]
i = param_vec[8]
j = param_vec[9]
list_a = [a, b, c, d, e]
list_b = [f, g, h, i, j]
p_x = np.where((x >= a) & (x <= e),
np.interp(x, list_a, list_b), 0)
p_x = p_x / np.sum(p_x)
return p_x
n_cols = 12
n_params = 11
n_rows = 10000
x_size = 100000
test_arr = da.random.uniform(low=0, high=1, size=(n_rows, n_cols, n_params), chunks=(100, n_cols, n_params)).astype('float32')
x = np.linspace(0, 1, x_size, dtype='float32') #Switching to plain Numpy array
# Again, Numpy only
def process_np_chunk(chunk, x):
i_arrays = []
for i in range(chunk.shape[0]):
j_arrays = []
for j in range(chunk.shape[1]):
tmp_array = create_array_chunk_np(chunk[i, j,:], x)
j_arrays.append(tmp_array)
j_stack = np.stack(j_arrays, axis=0)
i_arrays.append(j_stack)
res = np.stack(i_arrays, axis=0)
res = test_arr.map_blocks(process_np_chunk, x, chunks=(100, 12, x_size), dtype='float32')
# Cannot compute the result, not enough memory, just stream it to disk
da.to_npy_stack("/work/scratch/eynardbg/data_dask_2065", res)
Also, not sure if this is expected, but the create_array_chunk function is returning all nan with this error:
RuntimeWarning: invalid value encountered in divide