When adding new columns to dataframes, accessing columns gets slower because all new columns are always computed

Hi,
I have a question related to adding colums to dataframes.
My use-case is as following:
I have a quite large dataframe (actually stored in parquet files on disk) that consists of multiple partitions.
I want to add new columns to that dataframe, which are based on other existing columns. Some of those operations are quite expensive.

After adding new columns I want to access different columns individually.
However, operations seems to get much slower, because It seems that after adding new columns to the dataframe, the new columns are always calculated, although not needed.

e.g. consider the following minimal example code (not using parquet files, but with parquet files I can reproduce the same behavior)

import pandas as pd

from dask.dataframe import from_pandas

# create a simple pandas dataframe with some two example columns
df = pd.DataFrame({'a': [0, 1, 2],
                   'b': [2, 3, 4]})


ddf = from_pandas(df, npartitions=1)

#accessing a column is a quite simple graph (of course!)
ddf['a'].visualize('graph_1.png')

# a new colum that is based on other columns
c = ddf.a * ddf.b

ddf_new = ddf.assign(c=c)

# however, when I access now one of the original columns 'a', the new column 'c' is still computed although not needed.
ddf_new['a'].visualize('graph_2.png')

It results in the following images:

left image:
access columns a of the original dataframe.

right image:
If I want to access the same columns in the new dataframe, it can be seen that column c is also calculated, although not needed.

Is there a possible solution/work-around to add columns to dataframe using dask without having that overhead?

BR,
Tom

Hi @ebnertom, welcome to Dask community!

Thanks a lot for this clear post and the reproducible example.

I think you want to look at the graph otpimization possibilities.

I just called optimize function on your final output, and it simplifies the graph appropriately:

from dask import optimize

optimize(ddf_new['a'])[0].visualize('graph_3.png')

Output:
image

To be honnest, I know Dask is optimizing the graph before execution, so it is possible that Dask was already doing this under the hood at compute time.

Hi,
thank you a lot, it seems that your solution works for my provided minimum example.
However, in my actual use-case I am also using the map_partition function, and it seems that it is not working in that case:

import pandas as pd
from dask import optimize
from dask.dataframe import from_pandas

# create a simple pandas dataframe with some two example columns
df = pd.DataFrame({'a': [0, 1, 2],
                   'b': [2, 3, 4]})


ddf = from_pandas(df, npartitions=1)


#accessing a column is a quite simple graph (of course!)
ddf['a'].visualize('graph_1.png')

def my_function(df):
    print(f'my_function called ({len(df)})')
    return pd.DataFrame.from_dict({'c': df['a'] + df['b']})

ddf_mapped = ddf.map_partitions(my_function)
ddf_new = ddf.assign(c=ddf_mapped['c'])

# however, when I access now one of the original columns 'a', the new column 'c' is still computed although not needed.
optimize(ddf_new['a'])[0].visualize('graph_2.png')


print('calculating ddf:')
ddf['a'].compute()
print('calculating ddf_new:')
optimize(ddf_new['a'])[0].compute()

The resulting graphs seem okay in that case, however according to the output, the function my_function get’s called even when only the column a is retrieved.

see output:

my_function called (2)
calculating ddf:
calculating ddf_new:
my_function called (3)

You’re right, I can reproduce the behavior.

I think that the optimization is actually only fusing the oprations into one task, but they will all be there, even if you try to only access a column that doesn’t need it.

I guess that is the way the tasks graph work underneath, Dask is not smart enough to simplify it as the new DataFrame is built up using a map_partition at some point.

I guess the best way to go is either keeping in memory the untouched DataFrame object and the new one, or either computing all the results first before accessing columns.

OK thanks.

The workaround to keep original untouched dask dataframe and the dataframes with new columns separately would work for me
In my complex use case this is not such a pretty solution like I would have wished, but I will figure out a way how to deal with it.

I don’t know dask internals. Do you think this would be an improvement that can be implemented in dask?

BR,
Thomas

I’m not really familliar with Dask internal either, but I think you could just open an issue on dask github and ask for it with your reproducer!

You may want to try dask-expr. It is a rewrite of the existing DataFrame API with the explicit goal to handle optimizations better.

You just need to replace the dask.dataframe import with the dask-expr import and you should be good to go

import pandas as pd

from dask_expr import from_pandas

# create a simple pandas dataframe with some two example columns
df = pd.DataFrame({'a': [0, 1, 2],
                   'b': [2, 3, 4]})

ddf = from_pandas(df, npartitions=1)
def my_function(df):
    print(f'my_function called ({len(df)})')
    return pd.DataFrame.from_dict({'c': df['a'] + df['b']})

ddf_mapped = ddf.map_partitions(my_function, meta={"c": float})
ddf_new = ddf.assign(c=ddf_mapped['c'])
ddf_new["a"].compute()

This is properly optimizing what you are asking for

image

Dask-expr is using the same computational algorithms as dask.dask so it’s safe to use. If you’ll encounter anything that is not implemented, yet, you should see an appropriate exception before something bad happens.