hi,
so i’m trying to implement dask into a software that consists of multiple modules being called after another to process dataframes. The modules that are being called are inside a config file.
So far i managed to to “change/convert” most of the Pandas dataframes into Dask dataframes, but i don’t know how to handle the dataframes when passed to another module as processing differs in each one. I try to explain how it works as multiple filetypes are supported and the files mostly won’t fit into the machines memory (e.g. csv, npy files):
First of, the pipeline is executed. It takes informations from the config file and writes new informations into it. Next the “manager” is called and here i can say everything starts: a Pandas dataframe is created. Then a loop starts iterating through each module that is listed inside the config file. The modules process method receives the above created dataframe as parameter. Inside the first module, the data is extracted from the inputfile with some modification and then returned to the “manager”. Next iteration and the next module is called that receives the just processed dataframe (here i converted the mangers df into a Dask one and passed that through).
The problem is, that processing differs. I can pass the df to the first 3 modules (as there are mostly columns manipulated) until i have to convert it back to a pandas df with compute() and then into a numpy array so it can be processed with pytorch (values are processed here). After pytorch, i create a new Dask df with the same amount of partitions inside that module, pass the pd df return it to the manager (i currently have only 1 partition that i will change when i know more about it).
The last module creates a file of the inputtype and writes the dataframe into it. The df is then still returned to the manager but no further computations happens here, from the manager the df is returned to the script that starts the software but nothing happens there too.
Since it’s lazy and i have to call .compute() so that computation is applied - my questions are: When would be the best point to call computation? Should i just make sure to pass the same df as long as possible until calling compute()? Should i rather call compute() in every module and return it as pd df (i can imagine memory problems to happen here) or compute() inside every module and create a new Dask df and return that? Do i also need to call .compute() again before writing anything into a file or does that happen by the method itself?
I hope my questions are halfway understandable