Map_partitions question for image processing

I am facing issues with using map_partitions

import numpy as np
import dask.dataframe as dd
from dask.distributed import Client
from dask.distributed import progress
from PIL import Image

client = Client()
DATA_URL = "metadata.csv"
feature_names = ["image_path", "label"]
dtypes = {'image_path': np.str, 'label': np.int16}
df0 = dd.read_csv(DATA_URL, names=feature_names, dtype=dtypes)
df = df0.sample(frac=0.001)

new_df = df.repartition(npartitions=2)

def preprocess(path):
    im =
    pixels = list(im.getdata())
    return sum(pixels)

# this works for me(just calling apply on every image_path and getting the sum of pixels for every image)
sum_col = new_df.image_path.apply(lambda x: preprocess(str(x)),
# but why doesn't map partitions work
sum_col = new_df.image_path.map_partitions(lambda x: preprocess(str(x)),

Why is map_partitions taking the entire dataframe as a single string, while apply is taking each row separately?

Hi @vigneshn1997, thanks for the question! I separated it into a new topic for clarity.

With apply, the lambda function is applying preprocess(str()) to each value in the new_df.image_path series (as you correctly expected). For map_partitions, it’s applying preprocess(str()) to a whole series, separately for each partition. Therefore, you need an additional function instructing Dask to apply your function to each value in the series. Here’s a minimal example:

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

# use the distributed client
client = Client()

# create simple dask dataframe
ddf = dd.from_pandas(
    pd.DataFrame({'image_path': ['x.jpg'] * 10, 'label': range(1,11)}),

# simplification of your function
def preprocess(path):
    return len(path)

# expected result, using apply
sum_col1 = ddf.image_path.apply(lambda x: preprocess(str(x)), meta=('sum_col1', int))

# not what we want, sum_col2 is a 2-element series of '86'...
sum_col2 = ddf.image_path.map_partitions(lambda x: preprocess(str(x)), meta=('sum_col2', int))
# returns a 'string-ified' version of the image_path series, with a length of 86
len(ddf.image_path.map_partitions(lambda x: str(x)).partitions[0].compute()[0])

# expected result, using map_partitions
sum_col3 = ddf.image_path.map_partitions(
    lambda x: x.apply(lambda y: preprocess(str(y))),
    meta=('sum_col3', int)

It’s also worth noting that passing meta=int will not work in the future-- if you’re using the latest version of Dask you’ll notice:

FutureWarning: Meta is not valid, `map_partitions` expects output to be a pandas object. Try passing a pandas object as meta or a dict or tuple representing the (name, dtype) of the columns. In the future the meta you passed will not work.

Thank you very much @scharlottej13 for the clarification. I wanted to know if there is some way to control the map_partition computation to run on a worker. Can I use client.submit to submit the map_partition call on a specific worker?

This is so that I don’t have to call .compute to materialize the computation on the scheduler.

No problem @vigneshn1997! I’d be curious to hear more about why you’d like to avoid submitting work to the client? When a Client is instantiated, it automatically becomes the default for running Dask collections (e.g. map_partitions) and will distribute tasks to the available workers (more on this here). There’s more here on managing computation, including asynchronous computation, perhaps this is what you’re looking for?

So each worker will have a different partition of images and I want a worker to perform only on its partition of images (because if it tries to access some other image it will get a file not found error). I am trying to achieve a data-parallel setup using this.

Have you already tried using the workers parameter of Client.compute?

1 Like

Yes I was able to assign tasks to workers using workers parameter.

1 Like