AttributeError: 'DataFrame' object has no attribute 'repartition'

Hey I am a bit new to dask so apologies if its a very basic question. I have been trying parallelize my workflow which goes along the lines of read in a big dataset → filter it → convert a few columns to tensors. While trying to use dask dataframes to filter, I found there was no way to use .iloc to filter for the rows. Instead I tried to use repartition, but when I call .compute() later on, it returns this error which is completely baffling me.

Here’s the code for reference:

MAX_LEN = 300
users = df_clean.userID.unique().compute()
paths = []
labels = []
@dask.delayed(nout=2)
def get_paths(df, user):
    temp = df.loc[df.userID == user]
    npart = round(len(temp)/MAX_LEN)
    if npart == 0:
        return (temp.questionID.values, temp.result.values)
    else:
        parted_df = temp.repartition(npartitions=npart)
        return (parted_df.partitions[0].questionID.values,
                parted_df.partitions[0].result.values)
@dask.delayed
def convert_to_tensor(x):
    return torch.tensor(x)

for user in tqdm(users):
    path, label = get_paths(df_clean, user)
    paths.append(convert_to_tensor(path))
    labels.append(convert_to_tensor(label))

Any help or even feedback about using dask in this way would be appreciated

EDIT: Error occurs when I call dask.compute(*paths)

@ribhu97 Welcome to discourse, and thanks for your question!

I’m still trying to reproduce this error, and it’ll be really helpful if could share the complete error message/traceback and maybe create a minimal, verifyable example using some toy data like:

df = pd.DataFrame({'userID': ['1', '2', '3'], 'questionID': ['a', 'b', 'c'], 'result': ['99', '98', '97']})
ddf = dd.from_pandas(df, npartitions=2)

In general, I think you may find this blog post on filtering Dask DataFrames interesting.

Firstly, thanks for the help, and the resources you shared. Now, the problem is I tried to create a toy dataset and could not reproduce it myself, so not sure how that can be resolved.
But I was able to get around the issue of filtering the rows albeit in a pretty inefficient manner in pandas itself:

trunc_df = pd.DataFrame()
for user in tqdm(df.userID.unique()):
    user_df = df.loc[df.userID == user].iloc[-MAX_LEN:]
    trunc_df = pd.concat([trunc_df, user_df])

So the get_paths function then becomes:

@dask.delayed(nout=2)
def get_paths(df, user):
    temp = df.loc[df.userID == user]
    return (temp.questionID.values,
            temp.result.values)

But later when I try to run the compute part again I get this issue, after which my kernel crashes:

serWarning: Large object of size 3.15 GiB detected in task graph: ( questionID_2593 questionID_2595 questi … columns], 3764) Consider scattering large objects ahead of time with client.scatter to reduce scheduler burden and keep data on workers future = client.submit(func, big_data) # bad big_future = client.scatter(big_data) # good future = client.submit(func, big_future)

I even tried to scatter the paths list by doing client.scatter(paths) but even that crashes. Does this have to do with the number of partitions of the dataframe?

@ribhu97 Combining different Dask collections is tricky, we usually recommend switching between them to suit your workflow instead of using them together. In your example, you’re passing a Dask DataFrame to a Delayed function, these are two different collections, and I’m guessing that’s causing issues here.

On closer inspection, it looks like you’re filtering the DataFrame for each unique user, I think you can use a groupby-apply here to accomplish the same?

import numpy as np
import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'userID': np.repeat(np.array([1,2,3]), 400),
                   'questionID': np.arange(0, 1200),
                   'result': np.arange(1200, 2400),
                  })

ddf = dd.from_pandas(df, npartitions=2)

ddf.groupby('userID').apply(lambda x: x[['questionID', 'result']].head(300)).compute() # then convert to tensors

Also, sorry for the delay in my response!

1 Like