How do you pipe multiple arguments in a dask bag pipeline?

When building a pipeline in dask.bag, I usually do this to pipe several arguments between functions:

from functools import wraps

def star(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        return f(*args[0], *args[1:], **kwargs)
    return wrapper

...

(bag
 .from_sequence(urls)
 .map(lambda url: (url, download(url)))
 .map(star(lambda url, data: (url, process_data(data))))
 .filter(star(lambda url, result: result is not None))
 .map(star(lambda url, result: save_for_url(url, result)))
 )

This allows me to define functions with several arguments instead of a single tuple argument. How do you deal with this quirk? Is there a better way? I guess dask could provide a starmap method, similar to itertools.starmap in Python’s standard library.

Ian.

@ian.liu88 Thanks for your question!

Still thinking about a better solution, but here is a super-minimal example that reproduces this issue:

l = [("a", "b"), ("c", "d")]

l_bag = db.from_sequence(l)

# TypeError: <lambda>() missing 1 required positional argument: 'y'
l_bag.map(lambda x, y: x + y).compute()

# With star() -- works!
l_bag.map(star(lambda x, y: x + y)).compute()

@ian.liu88 Looks like Dask Bag does provide a starmap function:

l_bag.starmap(lambda x, y: x + y).compute()

But for your original pipeline, I assume we’ll also need a starfilter? – which Dask doesn’t currently have, but can perhaps be added(?).

Oh, wow, I didn’t realize it had a starmap! Thanks for pointing that out.

I guess I can live with my “hack” for filter operations.

1 Like