Ideally I’d like to leverage dask for distributing work between several machines as well as distributing it between multiple worker processes on each single machine.
Each task is CPU-heavy and needs read-only access to the same (large) input data.
Hence, in principle, the input data could be stored only once per machine - if each worker process can have shared access.
On a single machine, this can be achieved by instantiating the input data once as a global in the parent process. The forked child workers (e.g. via concurrent.futures) will share access when the memory is not written to (on unix-based OS).
However, on a dask LocalCluster the input data (once created) is explicitly transferred to each worker even if they reside on the same machine. Hence memory consumption is a factor of NUM_WORKERS higher.
Both approaches are illustrated in the pseudocode below
def create_data():
return big_chunk_of_data
def process_data(data):
# some CPU-bounded work
# Dask approach
# transfers data to each worker on same machine -> duplication
def dask_run():
client = dask.distributed.client(n_workers=NUM_WORKERS, threads_per_worker=1)
data = client.submit(create_data)
futures = client.map(process_data, [data] * NUM_TASKS)
return client.gather(futures)
# global multiprocessing approach
# leverages copy-on-write forking (on unix-based OS) -> no duplication
DATA = create_data()
def process_data_global(_dummy):
return process_data(DATA) # global closure
def mp_run():
with concurrent.futures.ProcessPoolExecutor(max_workers=NUM_WORKERS) as executor:
futures = executor.map(process_data, range(NUM_TASKS))
return list(futures)
Is there a way to achieve a similar memory sharing using dask?
One way I can think of is creating only a single dask worker per machine and then using (standard library) multiprocessing within each worker. Ideally I’d avoid this as it mixes the two multiprocessing paradigms. It becomes less flexible in inhomogeneous cluster set-ups, e.g. few workers on some machines, many on others. I’d prefer to manage this within dask cluster setup, not within worker code.
SHared memory between processes (within or between nodes) is not a solved problem for dask-distributed, although I and others have made POCs of the idea. However, you are free to play with the threads/processes mix of your cluster, and threads, of course, share memory. Since Dask always tries to move code to the data rather than copying data (data locality), in practice, shared memory has not seemed so important - especially in the many-nodes case. However, as massive single-node clusters are more common than they used to be, so it might be time to revisit.
Thanks. After your message I was able to find this issue:
Is this the right one to monitor progress for this use case? User applio seems to have similar workloads.
Until something like this might or might not be implemented, could you suggest any workaround to achieve this in the case of sharing a pandas.DataFrame? I have read through the issue and seen your gist, but I am slightly confused what would be the most fitting way of sharing the memory (files/in-memory/…) and if/how it is compatible with pandas (SharedArray is numpy-only).
That is the thing I was referring to when I said “my work”, but I thin there are some other experiments around, perhaps linked there.
Indeed, it deals with numpy, which is the easiest case. For dataframes, you would presumably share a bunch of numpy arrays or maybe arrow buffers (if they can become dataframes without copy). Arrow does have dedicated mechanisms for this, including its own shared-memory manager process that could be hooked into. The Plasma In-Memory Object Store — Apache Arrow v9.0.0