Best practices for passing a large dictionary to local cluster

I am trying to use Dask to run large scale simulations. The pattern I am currently following is this:

  • Pull a large amount of data from the DB as a python dictionary
  • Pass this data into dask.delayed
  • Run a large number of simulations

I am running into a few issues:

If I use all my available workers on my macbook (8) then each worker has 2 GB of memory. I am not able to even pass my data into delayed because my workers fail with worker exceeded memory budget errors.

If I use only 4 workers, and each gets 4 GB of memory, I can successfully delay the data, but all the workers then seem to load that memory in locally, and I am flooded with garbage collection warnings like this:
distributed.utils_perf - WARNING - full garbage collections took 25% CPU time recently (threshold: 10%)

I have also tried dumping my data into a json and letting dask delayed use json.load to read in my file. But I run into the same problem of a worker not having enough memory to read it in.

I am wondering what best practices are here. Ideally I would love to have the dictionary in my local memory and have each worker just be able to point to it if that is even possible. The goal is to be able to run these simulations with all available workers.

My current code looks something like this:

import dask
from dask.distributed import Client

client = Client()

data = {'key_1': {'data': {'blah': 'value'}}, 'key_2': {'more_data': {'blah_blah': 'values'}}}
delayed_data = dask.delayed(data)

parameters = [{"a": 1}, {"a": 2}]

simulations = []
for params in parameters:
    simulations.append(dask.delayed(run_simulation)(delayed_data, params)

results = dask.compute(*simulations)

1 Like

Hi @nelsongriff and welcome to discourse!

Since you’re running this locally, one improvement could be to use the threaded scheduler and avoid the cost of transferring data between tasks which occurs when using the multiprocessing or distributed schedulers. Here’s an example:

import dask

data = {'key_1': {'data': {'blah': 'value'}}, 'key_2': {'more_data': {'blah_blah': 'values'}}}
delayed_data = dask.delayed(data)

parameters = [{"a": 1}, {"a": 2}]

def run_simulation(data, param):
    return id(data)

simulations = []
for params in parameters:
    simulations.append(dask.delayed(run_simulation)(delayed_data, params))

# results will be the same, as opposed to scheduler = "processes"
results = dask.compute(*simulations, scheduler="threads")

The downside to the threaded scheduler is that depending on the work being done in run_simulation, you could encounter issues with Python’s Global Interpreter Lock (GIL), which means you won’t be taking advantage of any parallelism. Can you elaborate a bit on what happens in run_simluation?

How big is the dictionary and could it be partitioned? If it can be partitioned, then you would be able to load the partitions in different processes locally (so you’re not GIL-bound) or even on different workers in a cluster, which would be a good option.

If you’re unable to partition the data and run_simulation requires the GIL, then another option would be to use a hosted cloud provider with access to more memory.

4 Likes