Use multiple workers to load data into dask.array

Hi,

I have had some dask workers connecting to the scheduler and setup the dask client well. I hope that all workers can be used to load data from the outside storage and put the data into one dask.array.

It looks dask.array.from_delayed() will call my delayed on ONE worker and then populate the data into the dask.array among the cluster. However, I hope all workers can be involved in data loading, and the data loaded from the worker can be stored in same worker as the chunk of the dask.array.

Cam someone give me a hand for this and correct me if I am wrong? Any comments will be appreciated.

Chris Ding

I think you can find a good example reading the sql.py file. The idea is you create multiple delayed function calls that each loads a chunk of data. Then you convert the list of delayed object to a dask array using the function you mentioned.

some snippets copied,

for i, (lower, upper) in enumerate(zip(lowers, uppers)):
        cond = index <= upper if i == len(lowers) - 1 else index < upper
        q = sql.select(columns).where(sql.and_(index >= lower, cond)).select_from(table)
        parts.append(
            delayed(_read_sql_chunk)(
                q, uri, meta, engine_kwargs=engine_kwargs, **kwargs
            )
        )
    engine.dispose()

    return from_delayed(parts, meta, divisions=divisions)

code to run on worker

def _read_sql_chunk(q, uri, meta, engine_kwargs=None, **kwargs):
    import sqlalchemy as sa

    engine_kwargs = engine_kwargs or {}
    engine = sa.create_engine(uri, **engine_kwargs)
    df = pd.read_sql(q, engine, **kwargs)
    engine.dispose()
    if len(df) == 0:
        return meta
    elif len(meta.dtypes.to_dict()) == 0:
        # only index column in loaded
        # required only for pandas < 1.0.0
        return df
    else:
        return df.astype(meta.dtypes.to_dict(), copy=False)

Welcome @cuauty!

You’re right about the behavior of dask.array.from_delayed(). Indeed, the docs explicitly say:

“The dask array will consist of a single chunk.”

The docs also hint at the answer to your question:

“This routine is useful for constructing dask arrays in an ad-hoc fashion using dask delayed, particularly when combined with stack and concatenate.”

So you could use dask.array.from_delayed() for each chunk and afterwards concatenate the results to obtain a single dask array as in in the following:

import numpy as np
import dask
import dask.array as da


value_1 = dask.delayed(np.ones)(5)
chunk_1 = da.from_delayed(value_1, (5,), dtype=float)

value_2 = dask.delayed(np.zeros)(3)
chunk_2 = da.from_delayed(value_2, (3,), dtype=float)

array = da.concatenate([chunk_1, chunk_2], axis=0)  # dask will automatically assign workers to each chunk
array.chunks  # ((5, 3),)
array.compute()  # array([1., 1., 1., 1., 1., 0., 0., 0.])

Naturally, you’d need to replace np.ones and np.zeros with your function that loads your data from storage into memory.

There are of course other ways you could consider which avoid your use of from_delayed(), such as using map_blocks(), for instance. But I think the above should be sufficient for your needs. Feel free to counter if it is not.

@ubw218

It turns out that dask.dataframe.from_delayed() (which you are referring to) works quite differently from dask.array.from_delayed() (which the OP is referring to), namely, the dataframe version can take multiple delayed objects as input while the array version cannot. This is probably due to the generally multidimensional nature of arrays (in contrast to dataframes, which are two-dimensional).

Perhaps it’s an idea to enhance the current dask.array API with a similar suggestion as yours in the future. Feel free to open an issue (or pull request) regarding this at Issues · dask/dask · GitHub if you want.

Thanks a lot. @ParticularMiner Your reply works well.

Unfortunately, the code can’t resolve the issue I met. I can define the several delayed to read the data in all workers but da.concatenate will put all chunks in one workers, which means the subsequent compute will happen in ONLY ONE worker.

My code is as the follows.

import numpy as np
import dask
import dask.array as da

value_1 = dask.delayed(np.ones)((10,5))
chunk_1 = da.from_delayed(value_1, (10, 5), dtype=float)

value_2 = dask.delayed(np.zeros)((10, 3))
chunk_2 = da.from_delayed(value_2, (10, 3), dtype=float)

array = da.concatenate([chunk_1, chunk_2], axis=0)  # dask will automatically assign workers to each chunk. I have seen each worker involved indeed. 
array.persist()  # I use persist to store the data in memory
de = dask.delayed(func)(ar1)  # The "func" is my function to do something. 
results = de.compute()  # However, in this step only one worker is used 

In above code snippet, the function “func” only is called in ONE worker and received all data belonging to this dask.array with numpy.ndarray, and there is nothing happened in other workers.

Is there any way to make my “func” is called in all workers and each worker received the data of its chunk?

Thanks again.
Chris Ding

@cuauty

For dask arrays, one often uses dask.array.map_blocks() to apply a function chunk-wise. So I suggest that you don’t use dask.delayed but rather something like the following:

de = da.map_blocks(func, ar1)  # The “func” is my function to do something.
results = de.compute()  # In this step many workers are used in general

I encourage you to read the documentation: dask.array.map_blocks — Dask documentation

@ParticularMiner

It makes sense. Thank you.

Furthermore, the compute() is synchronous, which means during compute() call I can’t do anything in the meantime.

Is there any async solution for this? For example, call da.map_blocks(func…) then get Future of execute it. I see the Client interface has this kind of thing like the follows. But after I put the dask.array as the argument in the client.submit, each “func” function received the dask.array instead of numpy.ndarray.

fs = []
for _ in range(worker_num):
    future = client.submit(func, ar1, pure=False)
    fs.append(future)
results = applicant.gather(fs)   # Wait for all worker finishing

Thanks a lot.
Chris Ding

@cuauty

you could use client.compute() as described here: Async/Await and Non-Blocking Execution — Dask Examples documentation

Even though there the context is dataframes, I expect similar behavior to apply to dask arrays as well.

results = client.compute(de)  # non-blocking
await results
2 Likes