Seeking Feedback on Dask Implementation for Custom Function Application

Hi everyone,

I’m new to Dask and excited to learn more about its capabilities! I’m currently working on a project that doesn’t seem to fit the typical Dask DataFrame or Array use cases, so I’d love to get your thoughts on my approach.

Here’s what I’m trying to accomplish:

  • I have 9 input DataFrames with 5 columns each. Their row counts range from 1 million to 4,000, decreasing by roughly half with each subsequent DataFrame.
  • I need to apply ~200,000 custom functions to these DataFrames. The functions vary in complexity, taking anywhere from 0.1 seconds to 2 minutes to execute. It totals to ~2000 CPU-hours. Each function requires access to the entire DataFrame (not partitions) because results would differ otherwise.
  • Each function generates a new column with the same row count as the input DataFrame.
  • I read the input DataFrames from Parquet files and write the resulting columns to Parquet files stored on disk (and eventually to AWS S3).
  • The current implementation computes all tasks and writes them to disk in parallel, but each function also returns a float metric to monitor execution performance.

Here’s a simplified pseudo-code version of what I have so far:

inputs = []
for file in input_files:
    inputs.append(dd.read_parquet(file, split_row_groups=False))
    
tasks = []
for f in functions:
    for input in inputs:
        for parameter_set in parameter_sets:
            tasks.append(dask.delayed(f)(input, parameter_set))
            
results = dask.compute(*tasks)

Challenges

  1. The input DataFrames vary significantly in size, which might affect task scheduling.
  2. The functions have very different execution times, which could lead to resource underutilization or bottlenecks.

Questions

  • Does this approach make sense for my use case?
  • Are there strategies or best practices for handling tasks with such diverse input sizes and execution times in Dask?
  • Would using a different part of the Dask API make this more efficient?
  • Any advice on scaling this to AWS S3 would be appreciated! So far, I tried code on my local machine applying 2000 functions only, but I need to run this on AWS for all 200k functions. I am planning to use Coiled.

Thank you so much for your time and help! I’m grateful for any feedback, pointers, or suggestions you can provide, even if it’s just to direct me to relevant documentation or examples.

Looking forward to hearing your thoughts!

Hi @lutfis, welcome to Dask community!

This approach makes sense, however, I believe there are other things to consider:

  • How big in volume are you dataframes? You should at least delayed and scatter these objects, and eventually read them on Worker side. If you don’t do this you will generate a big graph.
  • How are you writing columns? Do you add 200,000 columns to each DataFrame? How do you write them to Parquet? One by one?
  • You might want to process files one by one, especially if you apply 200,000 operations on each file.

I believe Delayed or Futures are the right approach here.

I don’t see any bottleneck here appart the writing step, but I’m not sure I understand yet when and what you are writing.

Hi @guillaumeeb , thank you!

Thanks for reviewing my question and providing detailed feedback. Here, I will try to answer your questions:

The file size of the input file in parquet format are starting from 180MB going down to 25KB. I am reading files like this: dd.read_parquet(file, split_row_groups=False) I believe the workers are reading the files each time before a function is applied. Is this correct? is this code delaying the read operation. This way is causing us to read the same file multiple times but I could not think of a better way of doing it.

Each function is creating a dataframe with one columns and it is writing that dataframe to a parquet file directly. This all in a delayed function, so it is happening on the workers one by one. Here is the delayed function in case it is helpful:

@delayed
def apply_feature_function(feature, prepared_ohlcv):
    feature_func = getattr(feature_functions, feature.function_name)
    result = feature_func(prepared_ohlcv,
                          feature.window1, feature.window2, feature.window3)
    result.to_parquet(f'data/basic_features/basic_feature_{feature.col_name}.parquet')
    # Switch to S3 after deploying the code to AWS
    # result.to_parquet(f's3://aws-s3-bucket-name/basic_feature_{feature.col_name}.parquet')

Yes, this makes sense as the size of my files are very different, processing each file one by one would be better. Thank you for this suggestion. The smallest files might not even need dask.

Thanks again and looking forward hearing more suggestions if you have any.

This does not necessarily mean you read the files multiple time. You could also persist the DataFrame into memory. However, if you are working with the entire DataFrame for each function, I would probably just Delayed a pd.read_something call, and then call other functions on this Delayed object.

Ok, perfect, embarrassingly parallel.

Not necessarily better, but this might be worth a try.

Thank you so much @guillaumeeb for clarifying the file reading process and for your suggestion about using pd.read_parquet with Delayed. I’ll explore persisting the DataFrame and see how processing files one by one impacts performance. Your insights have been really helpful, and I appreciate your time and support!