Best Practice for converting a function that takes multiple pandas dataframes into one that takes multiple dask dataframes?

Let’s say we have a function that takes in 2+ pandas dataframes as inputs and returns a pandas dataframe as an output. What’s the best practice for converting this from pandas into dask?

For single pandas dataframe function inputs, it’s simple enough to call something like df.map_partitions(fn) and have it work. Is there a way to do this for functions taking in multiple parameters? For example, see below mock code snippet.

import pandas as pd
from dask import dataframe as dd

# method
def pandas_method(
    dataframe_1: pd.DataFrame,
    dataframe_2: pd.DataFrame
) -> pd.DataFrame:
# black box function, assume it returns a new dataframe that is
# derived from inputs, inputs don't necessarily have 1:1 mapping for records
return result

# a simple runner to execute the code
def runner():
# api calls to get data, assume it comes in as dask dataframes
foo = api.read_dask("foo")
bar = api.read_dask("bar")

# get result from pandas function
result = pandas_method(foo, bar)

# api call that does something with result

Hi @selenehines, welcome to the forum!

I’m afraid that what you have in mind is not possible in a distributed context.

Either in your pandas_method you can process each Dataframe independently (i.e. nos correlation between records of the two Dataframes), and in this case you could divide the method in two and call it on each Dataframe separately (possibly with map_partitions, if it works).
Either you’ve got some relations between Dataframe, e.g. you need to do joins or other things, and in this case you’ll probably have to rewrite your algorithm a bit to make it distributed compatible.

Another way to put it is: how would you want Dask to align chunks correctly so that map_partitions would take the correct portion of each Dask dataframe every time?

df.map_partitions works easily on a single Dataframe: just apply a function to every chunk of it. It’s an embarrassingly parallel problem. It wouldn’t be possible to do this on several Dataframe with independent chunks.

I hope this clarifies things a bit, but maybe I didn’t catch something in your first post. If so please give some more details.