Hello all,
I’m using Dask to analyze a large set of community science data. Currently, I am trying to run some basic summary statistics on these data by location. Some locations have only one or two data points registered and some have thousands. What I want to do is grab each location, calculate some summary data for it (how many data points were collected, how many different collectors were involved, averages of what they found, etc) and get a Dataframe back that I can print out as one or more CSVs (our visualization tools require CSVs).
Ultimately, since locations are actual places on a map, I want my summary data to also look at the nearest other sampled locations and run some weighting functions and compare the point to its neighbors.
I started off with creating a Dataframe using from_delayed.
small_chunks = [location_processor(location) for location in location_list]
ddf = dd.from_delayed(small_chunks)
On a small sample of the data it worked fine. Once I started running a larger set of data I started getting crashes warning me that:
distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 1.21 GiB -- Worker memory limit: 1.83 GiB
One worker is always at fault (based on the dashboard). I assume that it is holding on to the results coming back that make the Dataframe until it runs out of memory. Based on that, I tried to make the code create the dataframe in chunks and then concatenate them all, but no luck. Same thing.
Clearly, I’m doing this wrong. What’s the right method?