Why dask runs with no results?

Hello, I want to ask why my code runs so long and still has no exported CSV files.

I want to do some calculations on a data frame by using Dask.
Here is a simple version of my code.
Step1: import CSV files

""" 
df1
word freq
A_B 1
C_D 2
E_F 3
"""
"""
df2
w1 w2 freq
A   B    1
C   D   1
E   F    1
"""

# Step2, Get the frequency from the df1
def get_uni_freq(df1, word):
    if len(df1[df1['word'] == word].compute()["word_freq"].values) != 0:
        freq = df1[df1['word'] == word].compute()["word_freq"].values[0]
        return freq
    else:
        freq = 0
        return freq
# Step3, put everything into the data frame
def calculate_MI(df2):
        print("Calculating MI")
        df2 = df2.rename(columns={"word_freq":"O11"})
        df2["R1"] = df2.w1.map(lambda w: get_uni_freq(df1=df1, word=w))
        df2["C1"] = df2.w2.map(lambda w: get_uni_freq(df1=df1, word=w))
        df2["E11"] = df2.R1 * df2.C1
        df2["MI"] = (df2.O11/df2.E11).map(lambda x: math.log2(x))
        print("Saving")
        df2.to_csv("result.csv", index=False, single_file=True, quoting=csv.QUOTE_NONE, escapechar="\\")

# Step4, everything runs like this
if __name__=="__main__":
    df1 = dd.read_csv("file1.csv", encoding="UTF-8")
    df2 = dd.read_csv("file2.csv", encoding="UTF-8")
    calculate_MI(df2=df2)
    print("Done!")

Basically, I want to map the word’s frequency from df1 and do the calculation in df2.
I have conducted the file for 2 days but still have no results.
I have no idea why it takes so long…, by the way, I tested my code with a small version of data, and it works… This gets me more confused…
Thus, I am here to ask for help with my work. Thanks a lot if you see my question here.

Hi @ShiShanLiu, welcome to Dask community!

So I had a quick try at your code, to better understand what you wanted to do. First, there are some inconsistencies in the provided code. You read the DataFrames twice, you search for a word_freq column instead of a freq one in your samples, your data samples have no corresponding values, you call a get_uni_freq function instead of get_freq, but I guess these are just typos.

But there are others more important things to look at:

Here, you compute the same values twice. You are also using a boolean filtering just to get one value. This is really inefficient, especially with distributed computing. You’d be better indexing your DataFrame by word once, and just loc the value you are searching.

For every word in df2, you’ll compute a result in df1. This will trigger a lot of burden on Dask, with a great amount of overhead, and lead to a really complex graph if you’ve got a lot of values. I thing what you want to do is just a join on word, why don’t you do that?

Beware of the use of single_file, it can cause things to be written sequentially, and might take a long time. But this is the last optimization to discuss.

One thing that I also wanted to ask is whether you really need Dask instead of Pandas. How big are your input files? Or maybe you might want to use Dask only for df2?

First, thanks a lot for your reply :smile:
I want to reply to your responses sequentially below:

  1. That’s my mistake to post the read_csv() twice, the correct importing CSV action is in Step4.
  2. The get_freq function is a typo, it should be get_uni_freq (they work identically).
  3. I want to check whether w1 & w2 in df2 have the corresponding freq in df1, if not, get_uni_freq function will return 0. I think you provide me with a better solution to map the freq from df1 by indexing word and using loc to get freq from df1.
  4. I understand single_file = True will slow down the exporting process, but I still want to use it because I just don’t want too many output files.
  5. Finally, my input file is around 800MB with 40 million rows. Maybe I can try to use Pandas first and give feedback later.
    Again, thanks for your response, I really appreciate it :pray:t2:

Does both your input files are this size, or just one of them? 800MB is quite small for Dask, most recent laptop have plenty enough memory to handle this, and your current code will be much more efficient with Pandas.

If you need Dask anyway, I would recommend to try to write your code as a join between DataFrames.

That’s why this forum is for :slight_smile: !

1 Like

Currently, only one file is about 800MB, the other is much smaller than the former.
You are right. I used Pandas to do the same thing, and I successfully got the wanted results.

About the last part, you mentioned that “join between dataframes”, do you mean that one df use pandas and the other use Dask?

Btw, I am still curious about the question that typically to which file size that people will use Dask for persuing better performance.

Not necessarily, even if that could be much more performant especially with one small file. I was meaning using the join function, because this is what you are doing in the end. It would be potentially faster with only Pandas too.

See also Dask DataFrames Best Practices — Dask documentation.

Typically, when the dataset does not fit in RAM. Dask can also be useful to benefit from multiple cores if you do expensive computations. But for data manipulation, if Pandas is enough, it’s often the best choice.

Sorry for the late reply first, I think I know more about Dask now.
Thank you for your heartwarming answers. :hugs: :pray:t2:

1 Like