Currently I load my data by dask.array.from_delayed which need to input “shape” as the argument. However, in my case I don’t know the value of shape when I invoke from_delayed. So I just give the arbitrary value here and then invoke the “compute_chunk_sizes” to get the correct value. My code is as follows.
ds = []
for i in range(worker_num)):
chunk_part = dask.array.from_delayed(
dask.delayed(settle_array)(name), # settle_array func will return numpy.ndarray
(1000, 100), # 1000 and 100 are both arbitrary value
dtype=np.float,
)
ds.append(chunk_part)
array = dask.array.concatenate(ds, axis=0) # automatically assign workers to each chunk
one_da = array.persist()
one_da.compute_chunk_sizes()
My first question is, although the above code works well in my test, is it formal solution for my requirement? (My requirement is, I don’t know the shape of chunk in advance and use arbitrary value). Is there any side effect?
My second question. If there are other function which can modify the ndarray returned by function “settle_array”, will the dask.array’s value be changed in the same time? In other way, when “settle_array” return the ndarray, does Dask do COPY or just shared this ndarray?
It’s probably better to use a value like (np.nan, np.nan) for the shape argument. See: Chunks — Dask documentation.
Regarding your second question: Very good question! I don’t have a definitive answer without first testing it myself. And I suppose the answer depends on the implementation of settle_array(). So I think it would be best for you to find this out for yourself.
For my first question, if I use value (np.nan, np.nan) for the shape argument as the follows, the concatenate call will complain that NAN shape chunk can’t be concatenated
ds = []
for i in range(worker_num)):
chunk_part = dask.array.from_delayed(
dask.delayed(settle_array)(name), # settle_array func will return numpy.ndarray
(np.nan, np.nan), # np.nan means unknown shape
dtype=np.float,
)
ds.append(chunk_part)
array = dask.array.concatenate(ds, axis=0) # Complain it can't concatenate NAN shape chunk
one_da = array.persist()
one_da.compute_chunk_sizes()
It looks that I must specify at least one concrete value for shape, e.g. (np.nan, 10).
For my second question, I write some codes to verify it. It looks Dask don’t copy the content of returned ndarray and use it directly as the chunk of Dask.array.
However, as the document describes, compute_chunk_sizes will trigger up the in-place computation. I guess that my delayed will be executed at once, which causes my “settle_array” function run step by step and not be executed in parallel.
Good point. Well then, it’s up to you to ensure that the expected and computed chunk sizes agree with each other at some point within your program. The disparity could lead to confusion to another who uses/reads your program.
In contrast, np.nan is often preferable since it clearly informs the user that the chunk-sizes are unknown. The following is a little more advanced method of achieving your goal:
import numpy as np
import dask.array as da
from dask.highlevelgraph import HighLevelGraph
from dask.base import tokenize
def settle_array(name):
# This is just an example chunk-function in which the shape of
# the output is unknown
return np.array([[1, 1], [0, 0]])[:1 if np.random.rand() < 0.5 else 2]
worker_num = 5
array_name = "settle_array-" + tokenize(name)
dsk = dict()
for i in range(worker_num):
dsk[(array_name, i, 0)] = (settle_array, name)
axis0_chunks = (np.nan,)*worker_num
axis1_chunks = (np.nan,)
graph = HighLevelGraph.from_collections(array_name, dsk, dependencies=[])
array = da.core.Array(
graph, array_name, (axis0_chunks, axis1_chunks), dtype=np.float_, meta=np.array([[]])
)
array.compute_chunk_sizes()