Hello,
I have read on stackoverflow that the delayed decorator has an argument nout which allows a delayed function to return 2 dataframes. I have also read in the docs that dask dataframe from_delayed might be deprecated in the future, and that from_map is preferred. Does from_map have an equivalent to from_delayed’s nout functionality which would allow me to return 2 dataframes?
I’m not sure I’m following correctly. from_delayed
does not have a nout
functionality.
Could you write at leasta pseudo example of what you are trying to achieve?
Do you want to builda Dask Dataframe using from_delayed
on a Delayed object with multiple outputs?
Thank you for your reply. Here’s a better explanation of what I’m trying to do. Min reproducible example:
import pandas as pd
import dask.dataframe as dd
import dask
from dask.distributed import Client
def get_data(fname):
#example data, it's not actually from a dataframe but from a file format not supported by dask dataframe directly
df = pd.DataFrame(data=[[1,'a'],[3,'b'],[2,'a'],[6,'a'],[4,'b'],[5,'b']],columns=['nums','letters'])
df['nums'] = df['nums']*fname
df_a = df[df['letters']=='a']
df_b = df[df['letters']=='b']
return (df_a,df_b,my_list)
if __name__=="__main__":
client = Client()
filenames = [100,1000]
df = dd.from_map(get_data,filenames,meta=tuple)
df_a = df[0]
df_b = df[1]
#Do lots of other things with df_a and df_b lazily
print(df_a.compute())
print(df_b.compute())
client.shutdown()
Here’s the traceback:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
File ~/.local/lib/python3.10/site-packages/dask_expr/_core.py:470, in Expr.__getattr__(self, key)
469 try:
--> 470 return object.__getattribute__(self, key)
471 except AttributeError as err:
File /usr/lib/python3.10/functools.py:981, in cached_property.__get__(self, instance, owner)
980 if val is _NOT_FOUND:
--> 981 val = self.func(instance)
982 try:
File ~/.local/lib/python3.10/site-packages/dask_expr/_expr.py:2062, in Projection._meta(self)
2059 if not isinstance(self.operand("columns"), (list, slice)) and not hasattr(
2060 self.operand("columns"), "dtype"
2061 ):
-> 2062 return meta_nonempty(self.frame._meta).iloc[0]
2063 # Avoid column selection for Series/Index
AttributeError: 'str' object has no attribute 'iloc'
During handling of the above exception, another exception occurred:
RuntimeError Traceback (most recent call last)
Cell In[1], line 23
19 filenames = [100,1000]
21 df = dd.from_map(get_data,filenames,meta=tuple)
---> 23 df_a = df[0]
24 df_b = df[1]
26 print(df_a.compute())
File ~/.local/lib/python3.10/site-packages/dask_expr/_collection.py:413, in FrameBase.__getitem__(self, other)
411 elif isinstance(other, list):
412 other = other.copy()
--> 413 return new_collection(self.expr.__getitem__(other))
File ~/.local/lib/python3.10/site-packages/dask_expr/_collection.py:4803, in new_collection(expr)
4801 def new_collection(expr):
4802 """Create new collection from an expr"""
-> 4803 meta = expr._meta
4804 expr._name # Ensure backend is imported
4805 return get_collection_type(meta)(expr)
File ~/.local/lib/python3.10/site-packages/dask_expr/_core.py:475, in Expr.__getattr__(self, key)
471 except AttributeError as err:
472 if key.startswith("_meta"):
473 # Avoid a recursive loop if/when `self._meta*`
474 # produces an `AttributeError`
--> 475 raise RuntimeError(
476 f"Failed to generate metadata for {self}. "
477 "This operation may not be supported by the current backend."
478 )
480 # Allow operands to be accessed as attributes
481 # as long as the keys are not already reserved
482 # by existing methods/properties
483 _parameters = type(self)._parameters
RuntimeError: Failed to generate metadata for ArrowStringConversion(frame=FromMap(b05e124))[0]. This operation may not be supported by the current backend.
Right now, I have a function that works and returns the whole dataframe, and then I just split df into df_a and df_b lazily after the .from_map call. But I would like to also return some information about the min and max values in the num column of df_a and df_b, to be given as divisions when setting my index for the dataframes. And I thought that it might be most efficient to get that information within my get_data function. But the more I think about this, the more I think that my overall approach is wrong.
Yes, dask.delayed
does come with some functionalities such as:
def delayed(obj, name=None, pure=None, nout=None, traverse=True):
so based on the API docs:
nout : int, optional
The number of outputs returned from calling the resulting ``Delayed``
object. If provided, the ``Delayed`` output of the call can be iterated
into ``nout`` objects, allowing for unpacking of results. By default
iteration over ``Delayed`` objects will error. Note, that ``nout=1``
expects ``obj`` to return a tuple of length 1, and consequently for
``nout=0``, ``obj`` should return an empty tuple
That being said- im not sure what do you try to achieve exactly? you want to return a tuple of dataframes instead of a single dataframe for every delayed object?
Thank you for your reply. I was able to achieve what I’m trying to do using dask delayed with nout. Here is the working min reproducible example:
import pandas as pd
import dask.dataframe as dd
import dask
from dask.distributed import Client
import dask.array as da
import numpy as np
@dask.delayed(nout=4)
def get_data(fname):
df = pd.DataFrame(data=[[1,'a'],[3,'b'],[2,'a'],[6,'a'],[4,'b'],[5,'b']],columns=['nums','letters'])
df['nums'] = df['nums']*fname
df_a = df[df['letters']=='a']
df_b = df[df['letters']=='b']
divs_a = np.array([df_a['nums'].min(),df_a['nums'].max()])
divs_b = np.array([df_b['nums'].min(),df_b['nums'].max()])
return (df_a,df_b,divs_a,divs_b)
if __name__=="__main__":
client = Client()
filenames = [100,1000]
dfs_a = []
dfs_b = []
dfs_divs_a = []
dfs_divs_b = []
for f in filenames:
delayed_a, delayed_b, delayed_divs_a, delayed_divs_b = get_data(f)
dfs_a.append(delayed_a)
dfs_b.append(delayed_b)
dfs_divs_a.append(da.from_delayed(delayed_divs_a,shape=(2,),dtype=float))
dfs_divs_b.append(da.from_delayed(delayed_divs_b,shape=(2,),dtype=float))
df_a = dd.from_delayed(dfs_a)
df_b = dd.from_delayed(dfs_b)
divs_a_list = list(da.concatenate(dfs_divs_a, axis=0).compute())
divs_b_list = list(da.concatenate(dfs_divs_b, axis=0).compute())
divs_a = divs_a_list[0::2]
divs_a.append(divs_a_list[-1])
divs_b = divs_b_list[0::2]
divs_b.append(divs_b_list[-1])
df_a = df_a.set_index("nums",sorted=True,divisions=divs_a)
df_b = df_b.set_index("nums",sorted=True,divisions=divs_b)
print("df_a:")
print(df_a.compute())
print()
print("df_b:")
print(df_b.compute())
print()
client.shutdown()
Output:
df_a:
letters
nums
100 a
200 a
600 a
1000 a
2000 a
6000 a
df_b:
letters
nums
300 b
400 b
500 b
3000 b
4000 b
5000 b
I originally asked about dd.from_map because I read in the dd.from_delayed documentation that it may be deprecated in the future, but I haven’t been able to find an equivalent, so I think I need to use dd.from_delayed for my use case. In my real code, get_data is reading in huge binary files, and I really don’t want it to have to do that more than once. Once the data is split into “a” and “b”, I know that it is naturally sorted by “nums”, but only after that split. Hence the desire to split it in get_data, while also grabbing the min and max values in each to use for “divisions” when setting my index later, simply to save set_index time. It is quite possible that I have massively over complicated this, and if so, would love to hear from anyone who might have a simpler solution. Thank you!
That is really interesting, I’m not sure about this deprecation warning and about abandonning from_delayed
, do some maintainers have any advice/opinion? cc @fjetter, @Patrick, @scharlottej13.
Anyway, I think your solution is good and your thoughts correct!