Online Data Generation and Streaming for ML Applications

Context

I’m working on ML applications for Science where we want to generate data from numerical simulations that are typically MPI-based programs running on multiple cores. As we are running on a HPC cluster for which network bandwidth is assumed to be better than disk I/O, the idea is to stream directly the data from the simulations to the trainer via network. The trainer will then use the generated data to train a ML model.

The application have 3 components:

  • the server in charge of the training;
  • the runner in charge of managing the simulations (typically Client in Dask vocabulary);
  • the clients that run the simulations (typically worker in Dask vocabulary).

Pseudo Code

Server

from torch.utils.data import IterableDataset, DataLoader

class Dataset(IterableDataset):
    def __init__(self, runner_hostname: str):
         # Synchronize with the runner

    def __iter__(self):
          """Yield data generated by the clients and received through the network"""
          while self.is_receiving():
              data = self.receive()  # Receive data from the network
              yield data

    def submit(self, command):
          """Submit a command to the Runner through the network"""

dataset = Dataset(runner_hostname)
for params in range(10):
    dataset.submit(params)  # Ask for simulation to be run
dataloader = DataLoader(dataset)
for batch in dataloader:
    # Train the model
    ...
for params in range(10, 20):
    dataset.submit(params)  # Ask for more simulation to be run

Runner

with SlurmRunner() as runner:
    # The runner object contains the scheduler address and can be passed directly to a client
    with Client(runner) as client:
         while True:
             command = receive()  # Get new submission from Server
             client.submit(command)  # Launch the simulation

Client

for step in algorithm:
     # Do complicated computation
     send(data)  # Broadcast generated data over the network

In a HPC cluster setting, there will be two Slurm jobs one for the Server and one for the Runner and Clients.

Questions

My questions are quite generic as I am looking for pointers to where to start and for knowing if this could easily be done with an existing library add-on instead of re-implementing one.

Is Dask Distributed the Right Library?

I’m thinking Dask could be useful for this task. Especially, it could manage the execution of the different simulations. I’m also considering Ray. What should I have in mind to prefer dask over ray for managing the execution of the simulations? Are these library actually appropriate or am I looking for a simple job scheduler? I had a positive experience with dask to submit jobs in Slurm allocation (GitHub - jacobtomlinson/dask-runners: Dask Runners)

How to Handle Communications?

I’m trying to implement something like this framework, which initially relies on ZMQ for the communication between Client and Server. I saw that Dask is not using ZMQ (c.f. Should we use ZeroMQ? · Issue #776 · dask/distributed · GitHub) but has its own communication module based on Tornado. Is there some reason to favor Dask’s approach over ZMQ? Which components of Dask should I look into to avoid reinventing the wheel?

Hi @LTMeyer, welcome to Dask Discourse forum!

First question to begin: isn’t the time to run the simulation the bottleneck here? Will you be gaining a lot of performance by streaming results through network, and is it worth the pain of a complex setup? I mean, typically, won’t the PyTorch part book GPU resources for nothing waiting for data to arrive?

Do you plan on a multiple node job for the Runner/Clients?

Well, I guess both are appropriate, but maybe they are overkill for what you need? Do you need to use multiple nodes? Is it only submission of subjobs like in your example, embarassingly parallel ones? Have you for example looked at GitHub - facebookincubator/submitit: Python 3.8+ toolbox for submitting jobs to Slurm?

For communication between your data server and the runner, I don’t think you’ll be able to use Dask, ands you don’t want to try to use Dask communication. You’ll need to use lower level libraries, or something like ZMQ yes, I guess.

All the above are only some personal advice, don’t take all for granted.

Hi @guillaumeeb,
Thank you for the reply and the relevant questions.

Indeed, if we were producing data with one long and slow simulation, the simulation itself will be the bottleneck. However, in our application we assume data are produced by simulation that run relatively fast (each time step would typically take from few milliseconds up to few seconds to complete). For instance, we are considering cheap CFD simulations and not full weather forecast model. Moreover, we assume there are plenty of cheap CPUs available on the cluster we can use to concurrently run many simulation instances.
So, instead of generating data offline that would be stored on disk and then load from disk during training (which is slow and costly), we would have data streamed from running simulations. Note that both approaches can be combined (some loading from disk and simultaneous generation). A last point on this matter is that by having everything simultaneous we can loop data generation and training and then select training example based on on-going model performances.

Do you think it makes sense? Please feel free to challenge the idea.

As the Runner’s task is to orchestrate the execution of clients it will not need much resources and will be running only on one node. The clients however will run the MPI-based simulations that can indeed span on several nodes.

The simulations executed by the clients do not need to exchange information between them. In this sense, the execution of clients is embarrassingly parallel.
Thank you for pointing to submitit. From what I’ve seen it can submit jobs based on command (like mpirun) within a Slurm allocation. The Runner could thus totally relies on it to submit the execution of the clients.
For the execution of the clients we would need multiple nodes but I understand this can be handled transparently with submitit.

So my understanding is that I could have:

  • The runner based on submitit;
  • Communication between clients and server and between runner and server based on ZMQ.

Thank you, I’ll try to set up a prototype from these remarks.

Actually, submitit allows for submitting SBATCH jobs from Python. But it does not provide out-the-shelf Python functions to submit jobs within already allocated Slurm resources. The feature that GitHub - jacobtomlinson/dask-runners: Dask Runners provides with Dask. Nonetheless, it seems possible to add a new Executor class to submitit to handle job submissions within an existing Slurm allocation.

That still makes the setup a bit complex, but yes it makes sense, and it can probably optimize the workflow.

Or Dask, depending on what you do. I have the impression that submitit is closer to your need, but I might be wrong.

If you have experience with dask-runners, then this is probably a good solution!