Timing and frequency of execution of delayed functions

Hello all,

I have some difficulties understanding when and how many times dask evalutates lazy instructions. In my application I need to deal with xarrays and convert them and store them to parquet. The code executes with no error, and data integrity seems to be preserved, but I do not understand how dask operates, as it seems to call the delayed function more than once, and not the same number of times for each argument that is passed to it.

What follows is a simplified version of my general case to reproduce what I mean. I am basically using print statements to learn when dask triggers execution of lazy functions. My questions are at the end of the post.

  1. Importing modules and starting client
import xarray as xr
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask.distributed import print

from dask.distributed import Client
client = Client(
    n_workers=1,
    threads_per_worker=1,
    processes=True, 
    memory_limit='auto'
)
client
  1. Defining a delayed function that takes as input a statement to print, generates a xarray dataset, flattens it to a pandas dataframe, print the statement, and returns the dataframe
@dask.delayed(nout=1, pure=True)
def generate_xr(to_print):
    ds = xr.Dataset(
        {
            'var1': (['dim1', 'dim2'], np.random.rand(3,2) ),
            'var2': ('dim1', np.random.rand(3) )
        },
        coords = {
            'dim1': [1, 2, 3],
            'dim2': ['a', 'b']
        }
    )
        
    df = ds.to_dataframe()
    df = df.reset_index() #flatten dataframe
    print(to_print)
    return df
  1. Main code that calls generate_xr() three times passing a different print statement each time, generates a common dataframe from the three returned, repartitions it, and stores it to disk in parquet format.
df = []
for j in range(3):
    df.append( generate_xr(str(j)) )
    
df_all = dd.from_delayed(df)
df_all = df_all.repartition(partition_size="300MB")
df_all.to_parquet(
    "./output/",
    engine="pyarrow",
    write_metadata_file = True,
    write_index=False
)

When I execute this code, the screen displays the following output:

0
2
1
0
2
1
0

My questions are:

  1. Why is generate_xr() evaluated more than once per print statement passed to it (i.e. more than three times total)? I expected it to be executed by to_parquet() or, if earlier (e.g. by from_delayed() or repartition()), that the result would have remained available for later operations.
  2. Why is generate_xr() called one extra time with the argument j=0?
  3. If I add print statements to the main code (part three), they are printed separately from the workers’ statements (i.e. in my jupyter notebook the workers’ statements are printed under the cell where the client is created, any main code statements under the part three cell) so I am not able to tell which functions in the main code triggers the execution of the lazy functions. Is there a work around to print all statements sequentially to the same cell output?
  4. Is the order of the print statements different from 0, 1, 2 because when subprocess are spawned, no order is enforced? In this case I would expect the printed order to change with different executions of the code, but on my machine it doesn’t and it’s always the one reported above.

Edit: by breaking down the main code over multiple cells in my notebook, I see that the first ‘0’ is printed when from_delayed() is called; the rest of the output is printed when to_parquet() is called. I guess that the 2,1,0 order then is due to executing operations in the reverse order that they were stacked. I am still confused as to why from_delayed() executes generate_xr() (and only for j=0) and why to_parquet() executes it twice and not just once.

Thanks a lot for any insight!

PS: I know that xarray and dask naturally integrates, and probably things can be done slightly differently, but the point of this exercise is to understand dask laziness (and my real case is more complex than this!)

Hi @enrico, welcome to Dask community!

Thanks for this detailed post and the reproducible example.

I think the explanations here are the following:

  • First “0” print comes from the inference of the column and dtypes of the Dask Dataframe when building it using from_delayed. If you specify meta keyword, it goes away:
df_all = dd.from_delayed(df, meta= {"dim1": "int", "dim2":"object", "var1":"float", "var2": "float"})
  • Duplicated print come I think from how the print function is defined, you got a print from Worker, and one when it goes back to the Client…

That makes sense, thanks a lot!

1 Like