I’m trying to parallelize the operation of a function from the Bib-Dedupe Github repo:
import re
import colrev.env.language_service
import pandas as pd
from colrev.constants import Fields
from rapidfuzz import fuzz
def calculate_similarities(pairs_df: pd.DataFrame) -> pd.DataFrame:
# Add similarities if both fields exist
similarity_functions = {
Fields.AUTHOR: calculate_author_similarity,
Fields.PAGES: calculate_page_similarity,
Fields.TITLE: calculate_title_similarity,
Fields.YEAR: calculate_year_similarity,
Fields.NUMBER: calculate_number_similarity,
Fields.CONTAINER_TITLE: calculate_container_similarity,
"title_partial_ratio": calculate_title_partial_ratio,
Fields.VOLUME: calculate_token_sort_ratio_similarity,
Fields.ABSTRACT: calculate_token_sort_ratio_similarity,
Fields.ISBN: calculate_token_sort_ratio_similarity,
Fields.DOI: calculate_token_sort_ratio_similarity,
}
for field, function in similarity_functions.items():
if function == calculate_token_sort_ratio_similarity:
pairs_df[field] = pairs_df.apply(function, args=(field,), axis=1)
else:
pairs_df[field] = pairs_df.apply(function, axis=1)
The function cycles through the columns in a loop and deploys different functions depending on the column. Two questions:
- It would be straightforward to convert the dataframe to a Dask dataframe and then use map_blocks with the custom functions. However, the underlying functions call methods from external libraries, such as
token_sort_ratio
from therapidfuzz
library:
def calculate_author_similarity(row: pd.Series) -> float:
if "author_1" in row and "author_2" in row:
author_1 = str(row["author_1"])
author_2 = str(row["author_2"])
abbreviated_similarity = 0
if len(author_1) > 200 or len(author_2) > 200:
abbreviated_similarity = (
fuzz.token_sort_ratio(author_1[:200], author_2[:200]) / 100
)
author_partial_diff = fuzz.partial_ratio(author_1, author_2) / 100
author_diff = fuzz.token_sort_ratio(author_1, author_2) / 100
author_full_diff = (
fuzz.token_sort_ratio(str(row["author_full_1"]), str(row["author_full_2"]))
/ 100
)
author_diff = max(
author_diff,
author_full_diff,
author_partial_diff,
abbreviated_similarity,
)
return author_diff
return 0
How do I allow worker nodes access to external libraries? Is Dask even set up for this? I asked Bard but couldn’t verify the answer it gave me anywhere else, so potentially a worthless hallucination.
- As you can see, the function cycles through every column. What I actually need is a partition on every column, but I’m pretty sure creating separate dask dataframes on every column is inefficient. Since there aren’t so many columns, I’d prefer to leave cycling through the columns. In light of that, how should I partition the dataframe so that I can chunk very column (series) and send it for parallel processing?