Possible to use functions from external libraries called within map_partitions function

I’m trying to parallelize the operation of a function from the Bib-Dedupe Github repo:

import re

import colrev.env.language_service
import pandas as pd
from colrev.constants import Fields
from rapidfuzz import fuzz

def calculate_similarities(pairs_df: pd.DataFrame) -> pd.DataFrame:
    # Add similarities if both fields exist

    similarity_functions = {
        Fields.AUTHOR: calculate_author_similarity,
        Fields.PAGES: calculate_page_similarity,
        Fields.TITLE: calculate_title_similarity,
        Fields.YEAR: calculate_year_similarity,
        Fields.NUMBER: calculate_number_similarity,
        Fields.CONTAINER_TITLE: calculate_container_similarity,
        "title_partial_ratio": calculate_title_partial_ratio,
        Fields.VOLUME: calculate_token_sort_ratio_similarity,
        Fields.ABSTRACT: calculate_token_sort_ratio_similarity,
        Fields.ISBN: calculate_token_sort_ratio_similarity,
        Fields.DOI: calculate_token_sort_ratio_similarity,
    }

    for field, function in similarity_functions.items():
        if function == calculate_token_sort_ratio_similarity:
            pairs_df[field] = pairs_df.apply(function, args=(field,), axis=1)
        else:
            pairs_df[field] = pairs_df.apply(function, axis=1)

The function cycles through the columns in a loop and deploys different functions depending on the column. Two questions:

  1. It would be straightforward to convert the dataframe to a Dask dataframe and then use map_blocks with the custom functions. However, the underlying functions call methods from external libraries, such as token_sort_ratio from the rapidfuzz library:
def calculate_author_similarity(row: pd.Series) -> float:
    if "author_1" in row and "author_2" in row:
        author_1 = str(row["author_1"])
        author_2 = str(row["author_2"])

        abbreviated_similarity = 0
        if len(author_1) > 200 or len(author_2) > 200:
            abbreviated_similarity = (
                fuzz.token_sort_ratio(author_1[:200], author_2[:200]) / 100
            )

        author_partial_diff = fuzz.partial_ratio(author_1, author_2) / 100
        author_diff = fuzz.token_sort_ratio(author_1, author_2) / 100
        author_full_diff = (
            fuzz.token_sort_ratio(str(row["author_full_1"]), str(row["author_full_2"]))
            / 100
        )
        author_diff = max(
            author_diff,
            author_full_diff,
            author_partial_diff,
            abbreviated_similarity,
        )
        return author_diff
    return 0

How do I allow worker nodes access to external libraries? Is Dask even set up for this? I asked Bard but couldn’t verify the answer it gave me anywhere else, so potentially a worthless hallucination.

  1. As you can see, the function cycles through every column. What I actually need is a partition on every column, but I’m pretty sure creating separate dask dataframes on every column is inefficient. Since there aren’t so many columns, I’d prefer to leave cycling through the columns. In light of that, how should I partition the dataframe so that I can chunk very column (series) and send it for parallel processing?

Hi @velosipednikov,

Which kind of Dask deployment are you using? LocalCluster, Kubernetes, HPC systems?

Ideally, Worker nodes should have the same Python environment as the Client. This ease easy with LocalCluster, on HPC systems you just need to have the environment on a shared directory, with Kubernetes use the same Docker images… There are also other ways like Plugins: Plugins — Dask.distributed 2023.11.0+22.gdc06ce4 documentation.

Does the algorithm needs to have the entire depht of one column in memory? Or are row processing independent?

Which kind of Dask deployment are you using? LocalCluster, Kubernetes, HPC systems?

Let’s say that I want to set it up both locally on my machine, as well as if I were to deploy it on AzureML.
What would be the procedure to set up the same Python environment on the worker nodes in those 2 scenarios?

Does the algorithm needs to have the entire depht of one column in memory? Or are row processing independent?

Row processing is independent. So if I have a vector of length n, I’d like to break it up into chunks of size n/# of workers to send to each worker.

A local setup (with LocalCluster) is using the same environment for Client, Scheduler and Worker.

For AzureML, I’m not sure, but i would probably be a dask-cloudprovider or dask-kubernetes deployment, so you’ll need to use a Docker image, the same for every component.

Well, then it’s just standard partitionning, depending on how you read the data, you can configure a chunk size, using bytes or row number, and your data will be processed by every worker chunk by chunk.

To confirm, for the LocalCluster option, I don’t need to pass an image to the worker nodes? So as long as I’m a virtual environment when I run the code, the Workers will replicate the main environment as well?

Yes, to be clear, the Workers won’t replicate the environment, but just use the same environement you are using when creating the LocalCluster object.

Sorry, I’m not following your statement exactly. If I run client = Client() to utilize my available cores in a virtual environment with the libraries that I need, then please confirm that the workers will also have a copy of that environment when they’re executing the commands.

Yes, they will. client = Client() is a shortcut to

cluster = LocalCluster()
client = Client(cluster)
1 Like