Best method to create a Dataframe with calculated data added to it

Hello all,
I’m using Dask to analyze a large set of community science data. Currently, I am trying to run some basic summary statistics on these data by location. Some locations have only one or two data points registered and some have thousands. What I want to do is grab each location, calculate some summary data for it (how many data points were collected, how many different collectors were involved, averages of what they found, etc) and get a Dataframe back that I can print out as one or more CSVs (our visualization tools require CSVs).

Ultimately, since locations are actual places on a map, I want my summary data to also look at the nearest other sampled locations and run some weighting functions and compare the point to its neighbors.

I started off with creating a Dataframe using from_delayed.

small_chunks = [location_processor(location) for location in location_list]
ddf = dd.from_delayed(small_chunks)

On a small sample of the data it worked fine. Once I started running a larger set of data I started getting crashes warning me that:

distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 1.21 GiB -- Worker memory limit: 1.83 GiB

One worker is always at fault (based on the dashboard). I assume that it is holding on to the results coming back that make the Dataframe until it runs out of memory. Based on that, I tried to make the code create the dataframe in chunks and then concatenate them all, but no luck. Same thing.

Clearly, I’m doing this wrong. What’s the right method?

@Polemaetus Welcome to Discourse! Would it be possible for you to share a minimal example of your Dask workflow? A lot of things can result in unmanaged memory, so it’s tricky to diagnose what’s happening in your case. I’m especially curious about your location_processor function, which appears to be a delayed function?

On a general note, I’d encourage you to check out the Delayed and DataFrame best practices docs. It covers some common scenarios. :smile:

Currently, I am doing almost nothing except this from_delayed call.
I have a dd.read_csv() to get the data and then I pass it right on to this:

small_chunks = [location_processor(location) for location in location_list]
ddf = dd.from_delayed(small_chunks)

The (delayed) location_processor is meant to pull out a subset dataframe and create some summary stats for it. The from_delayed call should (hopefully) create a new dataframe with every location and then the summary stats. This is where everything breaks, so I haven’t ever gotten any further.
location_processor looks like:

@delayed
def location_processor(id):
    point = all_records.loc[all_records['LocationID'] == id]
    observers = len(point['Observer'].dropna().unique())
    minutes_per_event = point.groupby('EventID').Minutes.mean().compute()
    final_frame = pd.DataFrame({'Location': [id],
                                 'Observers':[observers], 'Minutes': [
            minutes]})

    return final_frame

There are actually more .dropna().unique() 's in there, but this is a minimal example. Everything right now is either a .mean().compute() or a .dropna().unique()