Hey I am a bit new to dask so apologies if its a very basic question. I have been trying parallelize my workflow which goes along the lines of read in a big dataset → filter it → convert a few columns to tensors. While trying to use dask dataframes to filter, I found there was no way to use .iloc to filter for the rows. Instead I tried to use repartition, but when I call .compute() later on, it returns this error which is completely baffling me.
Here’s the code for reference:
MAX_LEN = 300
users = df_clean.userID.unique().compute()
paths = []
labels = []
@dask.delayed(nout=2)
def get_paths(df, user):
temp = df.loc[df.userID == user]
npart = round(len(temp)/MAX_LEN)
if npart == 0:
return (temp.questionID.values, temp.result.values)
else:
parted_df = temp.repartition(npartitions=npart)
return (parted_df.partitions[0].questionID.values,
parted_df.partitions[0].result.values)
@dask.delayed
def convert_to_tensor(x):
return torch.tensor(x)
for user in tqdm(users):
path, label = get_paths(df_clean, user)
paths.append(convert_to_tensor(path))
labels.append(convert_to_tensor(label))
Any help or even feedback about using dask in this way would be appreciated
EDIT: Error occurs when I call dask.compute(*paths)
@ribhu97 Welcome to discourse, and thanks for your question!
I’m still trying to reproduce this error, and it’ll be really helpful if could share the complete error message/traceback and maybe create a minimal, verifyable example using some toy data like:
df = pd.DataFrame({'userID': ['1', '2', '3'], 'questionID': ['a', 'b', 'c'], 'result': ['99', '98', '97']})
ddf = dd.from_pandas(df, npartitions=2)
In general, I think you may find this blog post on filtering Dask DataFrames interesting.
Firstly, thanks for the help, and the resources you shared. Now, the problem is I tried to create a toy dataset and could not reproduce it myself, so not sure how that can be resolved.
But I was able to get around the issue of filtering the rows albeit in a pretty inefficient manner in pandas itself:
trunc_df = pd.DataFrame()
for user in tqdm(df.userID.unique()):
user_df = df.loc[df.userID == user].iloc[-MAX_LEN:]
trunc_df = pd.concat([trunc_df, user_df])
So the get_paths function then becomes:
@dask.delayed(nout=2)
def get_paths(df, user):
temp = df.loc[df.userID == user]
return (temp.questionID.values,
temp.result.values)
But later when I try to run the compute part again I get this issue, after which my kernel crashes:
serWarning: Large object of size 3.15 GiB detected in task graph: ( questionID_2593 questionID_2595 questi … columns], 3764) Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers future = client.submit(func, big_data) # bad big_future = client.scatter(big_data) # good future = client.submit(func, big_future)
I even tried to scatter the paths list by doing client.scatter(paths) but even that crashes. Does this have to do with the number of partitions of the dataframe?
@ribhu97 Combining different Dask collections is tricky, we usually recommend switching between them to suit your workflow instead of using them together. In your example, you’re passing a Dask DataFrame to a Delayed function, these are two different collections, and I’m guessing that’s causing issues here.
On closer inspection, it looks like you’re filtering the DataFrame for each unique user, I think you can use a groupby-apply here to accomplish the same?
import numpy as np
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({'userID': np.repeat(np.array([1,2,3]), 400),
'questionID': np.arange(0, 1200),
'result': np.arange(1200, 2400),
})
ddf = dd.from_pandas(df, npartitions=2)
ddf.groupby('userID').apply(lambda x: x[['questionID', 'result']].head(300)).compute() # then convert to tensors
Also, sorry for the delay in my response!
1 Like