I am trying to use dask distributed to map a word count function to each partition of the dask dataframe. However, the final_res.compute() produces duplicate result, and it seems that the word_count_on_partition function is mapped on the complete dask dataframe 12 times instead of on each partition.
Thanks a lot!
def word_count_on_partition(name_s, stop_words):
import jieba
import pandas as pd
from collections import defaultdict, Counter
word_counts = defaultdict(int)
for one_sku_name in name_s["sku_name"]:
for word in jieba.cut(one_sku_name):
if len(word) > 1 and word not in stop_words:
word_counts[word] = word_counts.get(word, 0)+1
word_counts = pd.DataFrame.from_dict(res, orient='index')
return word_counts
df = dd.read_parquet("s3://sku/****",
storage_options=sss_option,
columns=["brand_cn", "sku_name", "sales_value",
"sales_volume", "date"],
filters=[[("cat_first_cn", "=", cat)]]
)
df = df[df["date"]=="2022-08"].repartition(npartitions=12)
final_res = df.map_partitions(word_count_on_partition, stop_words)
Debug:
I attempted to reduce the npartitions number, and found that the word_count_on_partition function is executed npartitions times on the complete dataframe. Why would this happened? Am I misunderstanding of map_partition() function?
Probably because of this line:
for one_sku_name in name_s[“sku_name”]:
loop the entire dataframe?
Is there any method to only loop the partition?