Best practice to distribute

I have very large hdf files each with a dataset X of forexample shape (24000000,8000) of dtype Int16.
I need to run a function on a subset of each of these columns, say X[50000:-50000,:].
This is way too big for memory, so I need to do something like

result = []
for x in X[50000:-50000,:].T:
    result.append(c.compute(myfunc,x)) # block until a worker is ready

basically where i go from 1st to last columns and distribute say 10 rows to workers, and whenever one is done, compute next column, then next etc.
This way i will only ever have 10 columns in memory which will fit nicely…

What is the right way to do this using DASK?

I have come up with this manual solution which seems really super clumsy:

from dask.distributed import Client
import time
from itertools import chain
import numpy as np

client = Client()

def slow_func(x):
    wt = np.random.randint(100,22000000)
    print(wt)
    for i in range(wt):
        2+2
    return x.sum()
def any_finished(jobs):
    return any([True for j in jobs if j.status=='finished'])
def split(jobs):
    done, pending = [], []
    for j in jobs:
        if j.status == 'finished':
            done.append(j)
        else:
            pending.append(j)
    return done, pending
jobs = []
res = []
my_simulated_hdf = np.random.randn(20,10)
max_r = 4
for i in range(my_simulated_hdf.shape[1]):
    while len(jobs)>=max_r and not any_finished(jobs):
        time.sleep(.5)
    done, jobs = split(jobs)
    if done:
        res.append(client.gather(done))
    jobs.append( client.submit(slow_func, my_simulated_hdf[:,i]) )

res.append(client.gather(jobs))
print(list(chain(*res)))

and furthermore the ordering of the results is not even ensured in this clumsy solution…

Hi @benja,

I would recommend you take a look at dask arrays:
Create Dask Arrays — Dask documentation.

This will help you lazily load you data in a ordrered way and apply transformation to it. Just use the appropriate chunk size for your use case (e.g. with just 1 column?).

Then, using something like:
https://docs.dask.org/en/stable/generated/dask.array.map_blocks.html#dask.array.map_blocks

You should be able to apply you function to each chunk.

1 Like

Hi @guillaumeeb , thank you very much for your answer. This looks very interesting and useful. However - how do i specify that i can hold say for example 10 blocks in memory at the time - so map_blocks can work on 10 blocks at the time?

This depends on your computing context. Are you on your personal Laptop, on a single server, or on a compute cluster? How many resources (CPU, RAM) do you have ? What amount of memory does need each chunk ? Can this be lowered if you reduce chunks size?

In any case, you should configure your workers or your multiprocessing cluster accordingly.

I see you’re just using

client = Client()

which defaults to using all the machine capacity as number of process/threads and available memory. For example, say you have a laptop with 8 cores and 16GB memory, this will leads to:

With this setup, Dask will be able to launch 8 tasks at the same time (one per thread, so one per core), so in your case, 1 column per core with 2GB of memory available for the computation.

So the easier way is just to configure your Dask cluster (so your workers) according to the problem you’re trying to solve. If you need 4GB of memory for the operation on each chunk, then you should use something like:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=4, threads_per_worker=1)
client = Client(cluster)

You can also do more complex things using Dask Resources mechanism, but I would advise against it and try to keep it simple for now.

Okay, i see @guillaumeeb
I am on a single server which is a relatively strong machine (256GB RAM and 88 cores)
The amount of memory I need on each chunk is directly related to the size of the hdf files, so if you imagine that in my hdf file i have a single dataset with all my data, I have in that dataset X rows for each columns. Each worker would then need to get passed a single column with X rows (thus a single vector). Since each of these vectors will require (worst case) 250MB memory, there should be plenty of memory to feed one vector for each work at the same time.
So lets say i build my dask array from the hdf file and chunk it as 1 full column per chunk and have 1 worker på cores (so 88 in total)… and in a loop serve all chunks to dask, i will reach chunk number 88 relatively quickly - and all workers will be busy. How will the reader then be able block until a new worker is available?
I mean, what is the mechanism that will be able to make sure to block the file reading while all workers are occupied such that we. dont just keep reading all file into memory and run out of memory?

Actually, you don’t express it that way. You’ll have a dask.array, which is an abstract representation of all your data, chunked in column (so 8000 chunks based on your first post). And you’ll apply a map operation on all these chunks.

This is the point of Dask, it executes tasks when computing resources are available. Its scheduling system is also made for this, for example it tries to execute a task graph depth first if possible, thus avoiding reading two many chunks in memory. But in your case which seems to be embarrassingly parallel, Daks will just stream the tasks on workers as resources are available. Dask is also lazy in its graph mechanism, which is used under the hood by dask.array.

So on your side, if the result of each chunk computation is just a scalar or something small, there shouldn’t be any problem.

The best is to try things out, and follow computation using Dask dashboard.

1 Like