Running DataFrame Partition Simulations in Parallel using dask.delayed()

Hi

I have a newbie query regarding running simulations in parallel using Dask.

I want to…

  • Read multiple JSON files into a Dask DataFrame

  • For each dataframe partition, run multiple simulations using dask.delayed()

  • Return the results as lists of lists for further processing.

My Jupyter Lab dummy code below seems to work well. dask.compute(combined_simualtions) produces the correct results.

My query relates to the output of dask.visualize(), below. What are the finalize cells in visualisation? And why are there 2 square output blocks before the finalize? They make me wonder whether I’m parallelising my code correctly. Also, I’m expecting to see a cell for the partition_simulations() function, but there isn’t one in the visualisation.

Any suggestions/guidance would be much appreciated.

Kind regards

import dask
import dask.dataframe as dd
ddf = dd.read_json('*.json')
def simulation(df, i):
    values = []
    for (value,) in df.itertuples(index=None, name=None): # Simulation is sequential in nature.
        values.append(value * i)
    return values
def partition_simulations(df, n):
    sims = []
    for i in range(n):
        sims.append(dask.delayed(simulation)(df, i))
    return sims
# Conduct N simulations for each partition in the dataframe.

N = 5

combined_simulations = [partition_simulations(df, N) for df in ddf.partitions]
combined_simulations
[[Delayed('simulate-3be53040-d28a-4f69-8fac-575fb8d4e623'),
  Delayed('simulate-13b53a3f-7387-46e4-8b6f-5120db3def87'),
  Delayed('simulate-0b6a792d-5032-4adc-9275-cb9c1a72be40'),
  Delayed('simulate-2ba66a6e-f047-45a1-b8ac-1b8fc03d5c13'),
  Delayed('simulate-1e893ccf-e91a-472a-886e-b3d555e78e98')],
 [Delayed('simulate-43e84a6d-4052-45ad-99bd-7890279ad6c0'),
  Delayed('simulate-ff10861a-41b8-438f-90fa-f3c119826592'),
  Delayed('simulate-a0fc058a-efa4-4820-9b85-7136e39276f9'),
  Delayed('simulate-5b0fe710-f1c2-40fa-88f3-077c6b4f609f'),
  Delayed('simulate-f1b90a52-ec91-402f-91b5-18394b2f2264')]]
dask.visualize(combined_simulations)

Hi @JohnDuffy, welcome to Dask community!

I’ve got to admit that I don’t know. Probably some cause of your iteration into the DataFrame partitions directly. I think it would be better to use DataFrame.to_delayed.

The first square is when you end up building the Dataframe, with two chunks. Second square is because you take partitions one by one next.

This function is is just building a task graph, it is not used by any Dask Collection, so Dask doesn’t know about it. Dask just knows that it has to run simulation function.

Thank you Guillaume

Your comments make sense. I will experiment with DataFrame.to_delayed, as you suggest.

Kind regards

John

1 Like