How to handle a Dask DF in multiple modules?

hi,

so i’m trying to implement dask into a software that consists of multiple modules being called after another to process dataframes. The modules that are being called are inside a config file.

So far i managed to to “change/convert” most of the Pandas dataframes into Dask dataframes, but i don’t know how to handle the dataframes when passed to another module as processing differs in each one. I try to explain how it works as multiple filetypes are supported and the files mostly won’t fit into the machines memory (e.g. csv, npy files):

First of, the pipeline is executed. It takes informations from the config file and writes new informations into it. Next the “manager” is called and here i can say everything starts: a Pandas dataframe is created. Then a loop starts iterating through each module that is listed inside the config file. The modules process method receives the above created dataframe as parameter. Inside the first module, the data is extracted from the inputfile with some modification and then returned to the “manager”. Next iteration and the next module is called that receives the just processed dataframe (here i converted the mangers df into a Dask one and passed that through).

The problem is, that processing differs. I can pass the df to the first 3 modules (as there are mostly columns manipulated) until i have to convert it back to a pandas df with compute() and then into a numpy array so it can be processed with pytorch (values are processed here). After pytorch, i create a new Dask df with the same amount of partitions inside that module, pass the pd df return it to the manager (i currently have only 1 partition that i will change when i know more about it).

The last module creates a file of the inputtype and writes the dataframe into it. The df is then still returned to the manager but no further computations happens here, from the manager the df is returned to the script that starts the software but nothing happens there too.

Since it’s lazy and i have to call .compute() so that computation is applied - my questions are: When would be the best point to call computation? Should i just make sure to pass the same df as long as possible until calling compute()? Should i rather call compute() in every module and return it as pd df (i can imagine memory problems to happen here) or compute() inside every module and create a new Dask df and return that? Do i also need to call .compute() again before writing anything into a file or does that happen by the method itself?

I hope my questions are halfway understandable :sweat_smile:

Hi @MarcoPasta,

It’s a little difficult to grasp all that your doing here, maybe a small code sample would help. Anyway, I’m going to try to answer. I think your first question sum up it all:

If your data is too large to fit in memory, then you must not call compute. Before calling compute, you need to make sure your data has been reduced and will fit in memory. Then you don’t need Dask, and can go back to Pandas.

There are some great blog posts or documentation here and there:
https://www.coiled.io/blog/dask-compute

So I’d say yes, delay the call to compute as long as possible if you don’t need result or if data is too big. Your should not call compute in every module, this would cause a lot of delay or errors as you said. And really importantly, if your end goal is to write to a file, you don’t need to call compute, just use to_parquet or any similar format that can be written in parallel.

Thanks for replying!

So far i call compute() about 2 times now. I try to reproduce the situations:

The first call is inside a for loop:
The DF is divided into batches. Each batch is processed within one iteration. For that, a lower bound “l” and an upper bound “u” is set. The DF is then sliced into the bounds and passed to a method:

This was with the old version

pixels: List[List[dict]] = []
pixels.extend(method(df[l:u][["posX", "posY", "posZ"]].values))

Behind this method is a pytorch.read_numpy() that expects a numpy Array here.
This is how i am trying to recreate it:

pixels: List[List[dict]] = []
# only convert the batches of the dask DF to a pd DF and pass it
npdf = df.iloc[:, l:u].compute()
pixels.extend(method(npdf[["posX", "posY", "posZ"]].values))

However multiple constellation failed with different errors like KeyError: "None of [Index(['posX', 'posY', 'posZ'], dtype='object')] are in the [columns]", so i ended up with:

pixels: List[List[dict]] = []
npdf = df.compute()
pixels.extend(method(npdf[l:u][["posX", "posY", "posZ"]].values))

Which is not really best practice as i convert the full DF into a pandas here and pass ist. Currently it fits, but i can imagine that with larger datasets it wont fit anymore and that i maybe have to slice the Dask partitions in this for loop (if even possible)…

The second time i call it in the output module:

def _save_h5(self, df, filename):
        filename = filename + ".h5"
        pdf = df.compute() # call this to convert back to pandas first
        with pd.HDFStore(filename, complevel=9 if self.compress else 0, complib="blosc") as h5store:
            h5store.put(key=self.h5_key, value=pdf, format="table", index=False)

Simply explained, i haven’t had the time yet to figure something out, but the output file gets created and then every key process gets stacked on top of it. I haven’t found a solution for Dask as read from the Docs it hasn’t support for a method like “put” yet.

I have to mention that the current situation is for h5 files, but there will also be similiar situations with csv files where where i cannot process every single key.

I hope replying on my own last message is ok:

so far i had a little progress, but i am still stuck on my first problem:

as the for-loop was just chunking manually, i sliced my Dask DF into 10 partitions and used the df.map_partitions() methods instead. However, i feel like the map_partition() method is not doing what i hoped it to do or i just completely use it wrong.

I have a list of pixels that i want to extend with each “iteration” of the foor loop or with each partition processed from the DF. The extensions happens in the detector that takes numpy arrays and returns the datatype of the pixels in init.
In the for-loop, around 10k colums where passed to the method, now with partitions it’s around 13-14k which should be fine.

The code looks like this so far (not original):

class Module: 
    __init__(self):
        self.pixels:  List[List[dict]] = []

    def _extend_pixels(self, df, detector: Detector): 
        # are those dfs computed here? 
        self.pixels.extend(detector.get_pixels_bulk(df[["posX", "posY", "posZ"]].values))
        print("Type of pixels: ", type(self.pixels))
        print() # some few more to check the data

    def process(df, detector, etc):
        df = df.map_partitions(self._extend_pixels, detector)
        pdf = df.compute() # edited this because self.pixel was empty
        print("Pixels: ", self.pixels[:10]) # this was empty before but has now values

However, i also have to mention that i run into a lot of errors while trying to do this and that some things were pretty weird:

  • Even tho i have 10 partitions, some prints came along around 21 times
  • Sometimes, prints were just empty. When i printed "DF: ", the output was just empty. Checking if the df in this “iteration” is empty also failed as it returned False.
  • Edit: pixel thins kinda works, but gets filled with 2 empty lists in the beginning. Is there a way to avoid this? Also it’s only filled when run compute() before
  • Edit 2: after i went some deeper into the results i saw that the 2 empty lists in the beginning should actually be a layer 0 and a layer 1…

Also in the previous module that runs before this operation i saw that an operation occured that involved groupby and reset_index(drop=true). After i checked, after completing the amount of partitions gets reset to 1.
I have appended a .repartition(npartitions=10) at the end of it, but here i raise the questions:

Since a lot of values get filtered and/or thrown through mean aggregation, do it need to run compute() before returning this DF for use in the next pixel module or is the “change” noted into this DF and applied automatically when i run map_partitions?

So yes, as you noticed, and corrected in your next post, this is not the good way to go, doing this, there’s no point using Dask at all. You could probably just be using Pandas and read by chunks.

There, it’s not the correct thing to do either. Somehow, you should try to use to_hdf method.

This is the correct way to go! But I think the code below this quote doesn’t take a correct approach. It’s maybe kind of working right now, if you use only the Threaded scheduler (and even though, I’m not sure, you might run into race conditions), but it won’t work if you begin using Multi processing scheduler or Distributed ones. You’re trying to update an object attribute from different threads/process concurrently, this is not good.

Instead, you should try to define a method that takes a Pandas DataFrame argument, and outputs your Array, and call it from map_partitions, not the other way around, with map_partitions in your module. That way, your main code could either call the module in a for loop with only Pandas, or through Dask with map_partitions.

What I have in mind is something like this:

def process(df, detector, etc):
    return detector.get_pixels_bulk(df[["posX", "posY", "posZ"]].values)

#Main process
my_dask_array = df.map_partitions(my_dask_dataframe, process)

Then you should somehow process this Dask Array, which should be an big Array composed of all the outputs on each input DataFrame partitions.

21 times sounds weird if you have only 10 partitions. However, if you don’t provide any metadata inputs to map_partitions, Dask will try to infer the output of the method in argument, and so call it once with an empty Dataframe. This is why you see some more calls and empty results at the beginning. But anyway, as said before, you shouldn’t try to update an object attribute inside map_partitions calls.

Are you doing some manipulations on the Dask DataFrame before calling map_partitions?

I don’t understand this part. Do you have a code sample?

Hi @guillaumeeb and thanks for replying!

a lot of stuff happened over night so i try to explain it:

first of, lets start with this:

Are you doing some manipulations on the Dask DataFrame before calling map_partitions?

Yes, the input_file comes with a lot of values that are either multiple occurences or empty. In both cases, those are either dropped or the first one is picked. Then for coordinates, those are averaged and for 1 column a sum is generated from those:

agg = {}
for col in df.columns:
    agg[col] = "first"
agg["posX"] = "mean"
agg["posY"] = "mean"
agg["posZ"] = "mean"
agg["edep"] = "sum"
return df.groupby(self.group_columns, group_keys=False, sort=False).aggregate(agg).reset_index(drop=True)#.repartition(npartitions=10)

This is what happens inside that module. Judging from looking at the raw input and the DF processed after this module, roughly

What I have in mind is something like this:

Next of, the process method does some more things than just return the pixel manipulation, that’s why i need to call map_partitions() inside of it (and also compute())… I found out that the first run is a kind of testrun to check the return value of that method, so i looked at the values and created a method to “filter” the testrun from being input into the pixel list. Still, the list is getting extended inside the method that is used for map_partitions.

def _extend_pixels(self, df, detector: Detector):
        if np.all(df.values[0] == df.values[0][0]): # filter testruns
            return df
        testpix = detector.get_pixels_bulk(df[["posX", "posY", "posZ"]].values) # used to check every partitions
        self.pixels.extend(testpix)
        return df


    def process(self, df: pd.DataFrame, detector: Detector, metadata: PipelineMetadata) -> pd.DataFrame:
        # pixels: List[List[dict]] = [] # used before 
        df = df.repartition(npartitions=10) # just making sure 
        df = df.map_partitions(self._extend_pixels, detector) # needed so that pixels list has values 
        pdf = df.compute(num_workers=2) 
        # Working combinations so far: 
        # 10 partitions 2 workers

        pixels_df = pd.DataFrame(list(itertools.chain(*self.pixels)), dtype=int)  
        repeats = list(map(len, self.pixels))

        # print("Len of pdf.values: ", len(pdf.values))
        # print("Len of repeats: ", len(repeats))

        repeated_values = np.repeat(pdf.values, repeats, axis=0) # this needs numpy so maybe use map_partitions here again? 

        new_df = pd.DataFrame(repeated_values, columns=df.columns) # unpractical 
        new_ddf = dd.from_pandas(new_df, npartitions=10)

# some column changes here and then it's returned 

This was the way that worked so far (i haven’t checked the output for correctness but it looks good so far). However i had to limit the amount of workers in compute() due to Cuda vram issues. I try to look deeper into this.

def process(df, detector, etc):
    return detector.get_pixels_bulk(df[["posX", "posY", "posZ"]].values) 

#Main process
my_dask_array = df.map_partitions(my_dask_dataframe, process)`

If you could explain to be how return the pixel list in this case it would help me a lot and i could optimise this way. The size of the full pixel list is just a few MB, this should be fine i guess. The previous always became a kind of Dask Series object that was not suited for my use case… also the pixel list is used to create a new list with repeats.

So lastly the output:
I just tested a little around and from my experience now, dd.to_hdf should do the exact same thing. However, do i need give the compute= parameter? I tried without, read the file into pandas and it was kinda the same, so i guess it should work, i will give it a try :slight_smile:

If I understand correctly, here you are performing an aggregation over all the DataFrame, so you should end up with much less data, don’t you? Could you consider just computing this aggregation and follow up with a Pandas Dataframe if it fits in RAM?

I guess the main question is: how many different keys do you have in group_columns?

If you still need to continue, there is something that I think could be optimized. I see you’re building a new Dataframe in the end of the process method. But you could probably built it directly as Dask one. Something like:

def _extend_pixels(self, df, detector: Detector):
     if np.all(df.values[0] == df.values[0][0]): # filter testruns
         return df
    testpix = detector.get_pixels_bulk(df[["posX", "posY", "posZ"]].values) # used to check every partitions
    return pd.DataFrame(testpix)

def process(self, df: pd.DataFrame, detector: Detector, metadata: PipelineMetadata) -> pd.DataFrame:
    # pixels: List[List[dict]] = [] # used before 
    df = df.map_partitions(self._extend_pixels, detector) # needed so that pixels list has values 

For now, I don’t quite get your repeated_values part.

You don’t, it’s True by default, so it will trigger the computation upon calling.