Hello all,

I wanted to ask that what is the most efficient method of writing a custom function which does some job on a column in a distributed system. For eg: Suppose I wanted to compute sum of squares of all elements of a dataframe(assuming all elements are integers). Then is this piece of code time efficient enough?

#
suppose 10 workers

df = dd.read_csv(“data/**/*.csv”)

df = df.persist()

col_sums = [0]

for col in df.colums:

```
tdf = df[col].apply(lambda x: x*x)
col_sums.append(tdf.sum())
```

sum = sum(col_sums)

print(sum.compute())

Please point out any mistakes or any better way. I doubt that this is the most efficient method, maybe something related to futures might be actually do distributed computing. Please if you have some any other method then reply to this post. I am new to distributed dask, so I am a bit overwhelmed with all of the properties of dask which it offers and cant understand what piece of code would reduce the latency the most.

Sorry but this might be a very trivial doubt, but I was not able to figure out the best way for distributed system even after reading the documentation several times. Thanks for your help.

Hi! I am definitely not an expert in Dask, I just wanted to help as much as I can since noone has offered any help for some days now.

From what I understand, this apply function will be automatically run in parallel, so something like the following will be close to optimal:

sum(df.col.apply(lambda x: x**2).compute())

Again, not an expert, but I am pretty sure that the above is more or less what you’re looking for.

2 Likes

@akshatarun Welcome to Discourse!

Thanks for the answer, @giorgostheo, that’s exactly right!

Here’s an example task graph that shows each column in each partition being computed in parallel:

```
import dask
import dask.dataframe as dd
ddf = dd.DataFrame.from_dict(
{
"x": range(10),
"y": range(10),
},
npartitions=2,
)
result = [ddf[col].apply(lambda x: x*x, meta=(str(col), "int64")).sum() for col in ddf.columns]
dask.visualize(*result)
```

A couple things to keep in mind:

1 Like

Hi @giorgostheo! Thanks a lot for your help. I researched a lot and found some ways to write the custom functions but still not sure about my approaches. Your way is one of the good ways to deal with the above problem. I think few corrections would be there like: df.col throws error etc. According to me the syntactical correct piece of code you mean is this:

sum([df[col].apply(lambda x: x**2) for col in df.columns]).sum().compute()

Please correct me if I am wrong and again thanks for your help.

Thanks for the warm welcome @pavithraes and also for your help! Ya after posting this I researched a lot about the best practices and stuff and came to know about the issue with persist. Not good with big data, which is my case. Also by vectorized operations you mean that the in built functions of dask like .sum(), .mean() etc. are vectorized operations and hence run faster than any custom function we write(which I infer is not vectorized?).

Also one more thing I would like to ask is: For computation of some custom functions on big data sets what is generally preferred: dask dataframes or dask array or a mixture of both? Because few features like skewness etc are only present in dask.array.stats library and not implemented in dask.dataframes…

Hey,

If I understood your initial question correctly, you want to sum the squares of a column (say “col”). If that’s the case, my first reply should work as intended. The code you attached seems to do a sum of all the sums of column squares (I seem one too many “sums” anyway). If that is what you want, the first sum (the one that will be executed last) should contain the whole expression, including compute.

So, if you want the sum of all sqaures for just column “col”, these should work:

sum(df.col.apply(lambda x: x**2).compute()) or df.col.apply(lambda x: x**2).compute().sum()

If you want the sum of sums of all column squares, this should work:

[df[column].apply(lambda x: x**2).compute().sum() for column in df.columns].sum() (renamed col to column so it is not mistaken for col mentioned above, you can name it however you like ofc)

Also, df.col should not error if there is a column named “col”…

@akshatarun I’d suggest using the collection that is best suited for your dataset and computations. Even if a function isn’t implemented in Dask, you can use workarounds like `map_partitions`

/ `map_blocks`

or open a feature request to add that function to Dask.

Note that mixing collection is **not** a good idea. It’s best to stick with one or the other for the entire workflow. But if you really need to, you can convert between them using functions like `dataframe.from_array`

and `DataFrame.to_dask_array`

.

1 Like