Using DataFrame apply in a loop

Hello there,
I am quite new to dask and have a question on using apply on a dask dataframe. I found the following code does not give the expected result:

import pandas as pd
import dask.dataframe as dd

numbers = range(0,4)
ddf = dd.from_pandas(pd.DataFrame({'x': [0,1,2,3]}), npartitions=1)
for i in numbers:
    ddf[str(i)] = ddf.x.apply(lambda x: x if x == i else 99, meta=(str(i), 'int64'))

This gives as answer on compute() this:

	x	0	1	2	3
0	0	99	99	99	99
1	1	99	99	99	99
2	2	99	99	99	99
3	3	3	3	3	3

However, if I do this outside of a loop, like so:

ddf[str(0)] = ddf.x.apply(lambda x: x if x == numbers[0] else 99, meta=(str(0), 'int64'))
ddf[str(1)] = ddf.x.apply(lambda x: x if x == numbers[1] else 99, meta=(str(1), 'int64'))
ddf[str(2)] = ddf.x.apply(lambda x: x if x == numbers[2] else 99, meta=(str(2), 'int64'))
ddf[str(3)] = ddf.x.apply(lambda x: x if x == numbers[3] else 99, meta=(str(3), 'int64'))

This gives the answer I would expect when doing ddf.compute():

x	0	1	2	3
0	0	0	99	99	99
1	1	99	1	99	99
2	2	99	99	2	99
3	3	99	99	99	3

Interestingly, if I do the same using a pure pandas dataframe, I do not have this issues:

pdf = pd.DataFrame({'x': [0,1,2,3]})
for i in range(0,4):
    pdf[str(i)] = pdf.x.apply(lambda x: x if x == i else 99)

gives the expected result and pdf at the end is

x	0	1	2	3
0	0	0	99	99	99
1	1	99	1	99	99
2	2	99	99	2	99
3	3	99	99	99	3

Can anyone explain to me what is going on here? To be clear, this example is a minimal example I made, the actual situation in my code is more complex so I can not avoid using apply and simply use column assignment.
I feel that I am missing something fundamental here about how dask does lazy evaluations or something similar. Any help/explanations would be very much appreciated indeed!

2 Likes

@Ole Welcome to Discourse!

This is to be expected. Dask is computed lazily – it only creates the “task graph” or the logic of your computation during the for-loop. When the column values are calculated (at the very end after the for loop, when you call compute), the value of i is 3, and this value is used to calculate each column. That’s why all the columns 0 to 3 look alike.

However, it does evaluate the metadata, like the new column names immediately if possible. Hence, we have the correct column names.

Does this make sense?

This is a known limitation of parallel and distributed computation, and the best practice is to avoid global state.

However, if I do this outside of a loop

In this example, you’re using integers instead of a variable like i – that’s why you get correct results.

You can maybe use something like client.submit instead:

import dask.dataframe as dd
import pandas as pd
from dask.distributed import Client

client = Client()

numbers = range(0, 4)

ddf = dd.from_pandas(pd.DataFrame({"x": [0, 1, 2, 3]}), npartitions=1)


def func(i, s):
    return s.apply(lambda a: a if a == i else 99, meta=(str(i), "int64"))


for i in numbers:
    result = client.submit(func, i, ddf.x)
    ddf[str(i)] = result.result()

ddf.compute()
1 Like

Thank you so much @pavithraes! I was guessing it was something to do with the lazy evaluation but did not quite grasp the role use of variable played here. I will try and use submit as you suggest.

1 Like