@delayed(nout=2) equivalent in dd.from_map?

Hello,
I have read on stackoverflow that the delayed decorator has an argument nout which allows a delayed function to return 2 dataframes. I have also read in the docs that dask dataframe from_delayed might be deprecated in the future, and that from_map is preferred. Does from_map have an equivalent to from_delayed’s nout functionality which would allow me to return 2 dataframes?

Hi @dasking_for_a_friend,

I’m not sure I’m following correctly. from_delayed does not have a nout functionality.
Could you write at leasta pseudo example of what you are trying to achieve?
Do you want to builda Dask Dataframe using from_delayed on a Delayed object with multiple outputs?

Thank you for your reply. Here’s a better explanation of what I’m trying to do. Min reproducible example:

import pandas as pd
import dask.dataframe as dd
import dask
from dask.distributed import Client

def get_data(fname):
    #example data, it's not actually from a dataframe but from a file format not supported by dask dataframe directly
    df = pd.DataFrame(data=[[1,'a'],[3,'b'],[2,'a'],[6,'a'],[4,'b'],[5,'b']],columns=['nums','letters'])

    df['nums'] = df['nums']*fname
    
    df_a = df[df['letters']=='a']
    df_b = df[df['letters']=='b']
    
    return (df_a,df_b,my_list)


if __name__=="__main__":
    client = Client()

    filenames = [100,1000]
    
    df = dd.from_map(get_data,filenames,meta=tuple)

    df_a = df[0]
    df_b = df[1]

    #Do lots of other things with df_a and df_b lazily

    print(df_a.compute())
    print(df_b.compute())
    
    client.shutdown()

Here’s the traceback:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File ~/.local/lib/python3.10/site-packages/dask_expr/_core.py:470, in Expr.__getattr__(self, key)
    469 try:
--> 470     return object.__getattribute__(self, key)
    471 except AttributeError as err:

File /usr/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
    980 if val is _NOT_FOUND:
--> 981     val = self.func(instance)
    982     try:

File ~/.local/lib/python3.10/site-packages/dask_expr/_expr.py:2062, in Projection._meta(self)
   2059 if not isinstance(self.operand("columns"), (list, slice)) and not hasattr(
   2060     self.operand("columns"), "dtype"
   2061 ):
-> 2062     return meta_nonempty(self.frame._meta).iloc[0]
   2063 # Avoid column selection for Series/Index

AttributeError: 'str' object has no attribute 'iloc'

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
Cell In[1], line 23
     19 filenames = [100,1000]
     21 df = dd.from_map(get_data,filenames,meta=tuple)
---> 23 df_a = df[0]
     24 df_b = df[1]
     26 print(df_a.compute())

File ~/.local/lib/python3.10/site-packages/dask_expr/_collection.py:413, in FrameBase.__getitem__(self, other)
    411 elif isinstance(other, list):
    412     other = other.copy()
--> 413 return new_collection(self.expr.__getitem__(other))

File ~/.local/lib/python3.10/site-packages/dask_expr/_collection.py:4803, in new_collection(expr)
   4801 def new_collection(expr):
   4802     """Create new collection from an expr"""
-> 4803     meta = expr._meta
   4804     expr._name  # Ensure backend is imported
   4805     return get_collection_type(meta)(expr)

File ~/.local/lib/python3.10/site-packages/dask_expr/_core.py:475, in Expr.__getattr__(self, key)
    471 except AttributeError as err:
    472     if key.startswith("_meta"):
    473         # Avoid a recursive loop if/when `self._meta*`
    474         # produces an `AttributeError`
--> 475         raise RuntimeError(
    476             f"Failed to generate metadata for {self}. "
    477             "This operation may not be supported by the current backend."
    478         )
    480     # Allow operands to be accessed as attributes
    481     # as long as the keys are not already reserved
    482     # by existing methods/properties
    483     _parameters = type(self)._parameters

RuntimeError: Failed to generate metadata for ArrowStringConversion(frame=FromMap(b05e124))[0]. This operation may not be supported by the current backend.

Right now, I have a function that works and returns the whole dataframe, and then I just split df into df_a and df_b lazily after the .from_map call. But I would like to also return some information about the min and max values in the num column of df_a and df_b, to be given as divisions when setting my index for the dataframes. And I thought that it might be most efficient to get that information within my get_data function. But the more I think about this, the more I think that my overall approach is wrong.

Yes, dask.delayed does come with some functionalities such as:

def delayed(obj, name=None, pure=None, nout=None, traverse=True):

Dask Delayed Source Code

so based on the API docs:

nout : int, optional
    The number of outputs returned from calling the resulting ``Delayed``
    object. If provided, the ``Delayed`` output of the call can be iterated
    into ``nout`` objects, allowing for unpacking of results. By default
    iteration over ``Delayed`` objects will error. Note, that ``nout=1``
    expects ``obj`` to return a tuple of length 1, and consequently for
    ``nout=0``, ``obj`` should return an empty tuple

That being said- im not sure what do you try to achieve exactly? you want to return a tuple of dataframes instead of a single dataframe for every delayed object?

Thank you for your reply. I was able to achieve what I’m trying to do using dask delayed with nout. Here is the working min reproducible example:

import pandas as pd
import dask.dataframe as dd
import dask
from dask.distributed import Client
import dask.array as da
import numpy as np

@dask.delayed(nout=4)
def get_data(fname):
    df = pd.DataFrame(data=[[1,'a'],[3,'b'],[2,'a'],[6,'a'],[4,'b'],[5,'b']],columns=['nums','letters'])
    df['nums'] = df['nums']*fname
    
    df_a = df[df['letters']=='a']
    df_b = df[df['letters']=='b']

    divs_a = np.array([df_a['nums'].min(),df_a['nums'].max()])
    divs_b = np.array([df_b['nums'].min(),df_b['nums'].max()])
    
    return (df_a,df_b,divs_a,divs_b)


if __name__=="__main__":
    client = Client()

    filenames = [100,1000]

    dfs_a = []
    dfs_b = []
    dfs_divs_a = []
    dfs_divs_b = []
    
    for f in filenames:
        delayed_a, delayed_b, delayed_divs_a, delayed_divs_b = get_data(f)
        dfs_a.append(delayed_a)
        dfs_b.append(delayed_b)
        dfs_divs_a.append(da.from_delayed(delayed_divs_a,shape=(2,),dtype=float))
        dfs_divs_b.append(da.from_delayed(delayed_divs_b,shape=(2,),dtype=float))

    df_a = dd.from_delayed(dfs_a)
    df_b = dd.from_delayed(dfs_b)

    divs_a_list = list(da.concatenate(dfs_divs_a, axis=0).compute())
    divs_b_list = list(da.concatenate(dfs_divs_b, axis=0).compute())

    divs_a = divs_a_list[0::2]
    divs_a.append(divs_a_list[-1])
    
    divs_b = divs_b_list[0::2]
    divs_b.append(divs_b_list[-1])

    df_a = df_a.set_index("nums",sorted=True,divisions=divs_a)
    df_b = df_b.set_index("nums",sorted=True,divisions=divs_b)
    
    print("df_a:")
    print(df_a.compute())
    print()
    print("df_b:")
    print(df_b.compute())
    print()
    
    client.shutdown()

Output:

df_a:
     letters
nums        
100        a
200        a
600        a
1000       a
2000       a
6000       a

df_b:
     letters
nums        
300        b
400        b
500        b
3000       b
4000       b
5000       b

I originally asked about dd.from_map because I read in the dd.from_delayed documentation that it may be deprecated in the future, but I haven’t been able to find an equivalent, so I think I need to use dd.from_delayed for my use case. In my real code, get_data is reading in huge binary files, and I really don’t want it to have to do that more than once. Once the data is split into “a” and “b”, I know that it is naturally sorted by “nums”, but only after that split. Hence the desire to split it in get_data, while also grabbing the min and max values in each to use for “divisions” when setting my index later, simply to save set_index time. It is quite possible that I have massively over complicated this, and if so, would love to hear from anyone who might have a simpler solution. Thank you!

1 Like

That is really interesting, I’m not sure about this deprecation warning and about abandonning from_delayed, do some maintainers have any advice/opinion? cc @fjetter, @Patrick, @scharlottej13.

Anyway, I think your solution is good and your thoughts correct!