Best way to persist different datasets in scaling workers

I have an application that contain ‘zones’ each zone loads a big dataset (that sadly can’t be partitioned). Then each zone will fire hundres of thosands of tasks that each read this dataset and perform some operations and train a model.

Currently I launch zones with fire and forget, and each zone does its loading and stores the dataset in a shared volume. Each task will then load its dataset and do its operations.
But this creates a big redundancy of loading the same dataset from disk multiple times.

Now I know that there is a method called scatter that perists the data in the workers and the scheduler will do its best to ensure tasks go to workers that already have the dataset loaded.
But, since workers should increase with load, new workers won’t have the dataset preloaded. (As per mentioned in the docs, where broadcast=True only works affects current workers). The scaling involves running new pods in a k8s cluster.

My question would be do you have any recommendations or any features that I’m missing that can help solve this problem? Or is dask distributed not ideal in this scenario? I’m also not opposed to manually creating the necessary workers, even if it’s not the ideal solution.

Thank you in advance.

Hi @edgar-s-silva-alb, welcome to Dask community!

What would really help for a better answer here would be a small MVCE demonstrating your use case. That said, I’ll try to answer several points or ask for precisions.

How big?

That’s a lot of tasks! Any possibility to fire less of those?

Scatter shouldn’t be affected by scaling of Workers. If a new worker needs the data, it will be transferred from one worker to another. Dask should handle this case well.

Moreover, broadcast is currently not effective by default in order to avoid too much memory pressure. But you shouldn’t use it in your case. Just scatter your zones, or persist them in memory if it is read from Workers (which would be the better).

Hey, thanks for taking the time.

It varies but let’s say around 2 GB for each zone.

That’s the number of models I need to train, the only thing I could do is some sort of grouping.

How would I persist it in the workers memory?

So to try to better explain the case, here is a simplified version of my corrent approach:

def orchestrator():
    cluster = # Create a cluster with autoscalling workers (with resources for seperating zones from jobs)
    client = Client(cluster)

    zones = [1, 2, 3]
    for zone in zones:
        fire_and_forget(client.submit(zone_training, zone))


def zone_training(zone):
    dataset = load_dataset_from_database(zone)
    dataset = preprocess_dataset(dataset)

    #Right now instead of passing the dataset, I'm passing the dataset location on shared volume
    daset_location = save_dataset_to_shared_volume(dataset)

    for job in dataset.jobs:  # Let's say that are 100 000 unique things to train in the dataset
        jobs = client.submit(training_job, zone, job, dataset_location, pure=False)

    for future, result in as_completed(jobs, with_results=True):
        print("Hey, I got my trained model")


def training_job(zone, job, dataset_location):
    dataset = load_dataset_from_shared_volume(dataset_location)

    processed_dataset = process_dataset(zone, job)
    model = train_model(processed_dataset)

    return model

The idea here is to make sure each worker keeps the dataset in memory for future jobs of the same zone, instead of what I’m doing currenly of loading the dataset each time.

Be just careful to not have every zone loaded in the same Worker memory at once… Not sure if this is really possible.

You might have to at some point to avoid generating too many tasks.

I believe scatter would almost be the same in the end, the important point is to read the data from a Worker, using Delayed or Future for submitting your reading method. Then just use Client.persist, to make sure it is loaded and kept into memory.

I guess you could just keep the Future pointing to your zone/dataset once you’ve loaded it, and just take this Future in input to your training_job instead of a disk location.