Dask map_partition() function produce duplicate result instead of the result of each partition

I am trying to use dask distributed to map a word count function to each partition of the dask dataframe. However, the final_res.compute() produces duplicate result, and it seems that the word_count_on_partition function is mapped on the complete dask dataframe 12 times instead of on each partition.

Thanks a lot!

def word_count_on_partition(name_s, stop_words):
    import jieba
    import pandas as pd
    from collections import defaultdict, Counter
    word_counts = defaultdict(int)
    for one_sku_name in name_s["sku_name"]:
            for word in jieba.cut(one_sku_name):
                if len(word) > 1 and word not in stop_words:
                    word_counts[word] = word_counts.get(word, 0)+1
    word_counts = pd.DataFrame.from_dict(res, orient='index')
    return word_counts
    df = dd.read_parquet("s3://sku/****",
                         storage_options=sss_option, 
                         columns=["brand_cn", "sku_name", "sales_value",
                                 "sales_volume", "date"],
                         filters=[[("cat_first_cn", "=", cat)]]
                        )
    df = df[df["date"]=="2022-08"].repartition(npartitions=12)
    final_res = df.map_partitions(word_count_on_partition, stop_words)

Debug:
I attempted to reduce the npartitions number, and found that the word_count_on_partition function is executed npartitions times on the complete dataframe. Why would this happened? Am I misunderstanding of map_partition() function?
Probably because of this line:
for one_sku_name in name_s[“sku_name”]:
loop the entire dataframe?
Is there any method to only loop the partition?