I am trying to use Dask to run large scale simulations. The pattern I am currently following is this:
- Pull a large amount of data from the DB as a python dictionary
- Pass this data into dask.delayed
- Run a large number of simulations
I am running into a few issues:
If I use all my available workers on my macbook (8) then each worker has 2 GB of memory. I am not able to even pass my data into delayed because my workers fail with worker exceeded memory budget errors.
If I use only 4 workers, and each gets 4 GB of memory, I can successfully delay the data, but all the workers then seem to load that memory in locally, and I am flooded with garbage collection warnings like this:
distributed.utils_perf - WARNING - full garbage collections took 25% CPU time recently (threshold: 10%)
I have also tried dumping my data into a json and letting dask delayed use json.load to read in my file. But I run into the same problem of a worker not having enough memory to read it in.
I am wondering what best practices are here. Ideally I would love to have the dictionary in my local memory and have each worker just be able to point to it if that is even possible. The goal is to be able to run these simulations with all available workers.
My current code looks something like this:
import dask
from dask.distributed import Client
client = Client()
data = {'key_1': {'data': {'blah': 'value'}}, 'key_2': {'more_data': {'blah_blah': 'values'}}}
delayed_data = dask.delayed(data)
parameters = [{"a": 1}, {"a": 2}]
simulations = []
for params in parameters:
simulations.append(dask.delayed(run_simulation)(delayed_data, params)
results = dask.compute(*simulations)