Hi!
I have a problem that Dask seems a good fit to solve, but I’m having trouble figuring out how to go about setting up a solution.
I’ll try to explain the problem, with out getting into the weeds too deep. I have two sets of geographic point data that I would like to compare and interpolation values from one to the other.
The first data set is a DataFrame (dst_df) representing a regular grid that is roughly 4000x8000 points across the globe and the second is a DataFrame (src_df) of 5,000,000 random points spread across the globe that have tier, category, and value assigned.
For every point in the regular grid I perform an inverse distance weighted calculation of the random point values to find the value at the regular grid point.
Is Dask the right tool for this?
What I’m doing ends up gobbling up all my RAM. I can break it up into smaller groups of futures, but it still takes a very long time. I’m not sure what my approach should be. Does anyone have any suggestions?
client = Client()
futures = []
for point in dst_df.itertuples():
distances = client.submit(distance_to_point, point, src_df, )
close_distances = client.submit(get_close_distances, distances, src_df, )
point_value = client.submit(get_point_value, close_distances, src_df, )
futures.append(point_value)
results = client.gather(futures)
Thanks!
Hi @5Nomads, welcome to Dask community!
From what I understand, you are submitting 3 tasks for each of your 5 millions random points, so 15 millions tasks at the end?
How long last each of these tasks?
A few suggestions I can think of:
- Did you try using Pandas only? What was the outcome?
- You should probably scatter the
src_df
dataframe ahead of time, so it is not sent for every task, how big it is, something like 120MB if float32?
- I would also just submit one method per point, I don’t think you need to submit three different tasks.
- And finally, it would probably be more efficient to read you random points into a Dask Dataframe, and wrap all this code in a
map_partitions
call, and to work with Pandas in each of the partition, especially if the computation on a given point is really short.
Thank you for the reply and suggestions. It’s actually worse, I am submitting 3 tasks for each of the 32,000,000 points in my grid. So 96-million total tasks.
I did try Pandas, it works well, but very slow. Estimated time to completion was going to be well over 40 days.
Simplifying to a single task, instead of 3, sounds like a good idea. I’ll try that.
Ultimately I need to get back an array that has 32,000,000 million values that came from interpolating the values from the 3 closest points from each tier/category in the src_df.
Each of the 32-million points has to be compared to all 5-million+ points in the src_df.
The regular grid (dst) size is:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32060028 entries, 0 to 32060027
Data columns (total 5 columns):
# Column Dtype
--- ------ -----
0 point_id int64
1 lat float64
2 lon float64
3 elevation int64
4 value float64
dtypes: float64(3), int64(2)
memory usage: 1.2 GB
and the random points (src):
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1090799 entries, 0 to 1090798
Data columns (total 6 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 category 1090799 non-null category
1 latitude 1090799 non-null float32
2 longitude 1090799 non-null float32
3 elevation 1090799 non-null float32
4 value 1090799 non-null float32
5 tier 1090799 non-null int64
dtypes: category(1), float32(4), int64(1)
memory usage: 26.0 MB
I’m no expert in this field, but there are probably other ways to interpolate input point on a regular grid. See for example PyInterp.
32 millions tasks is quite huge already. You’ll probably have to batch the calls or use map_partitions if you stick to that way.
That makes sense. I’ll definitely check out that PyInterp project.
I have a different solution that’s actually pretty fast, I was just hoping to utilize Dask and AWS’s EMR to make it a little more off the shelf.
Right now I end up using Geohashes (mentioned in the PyInterp project) to slice the globe into chunks or regions based on the geohash values and interpolation that way. This dramatically reduces calculation time, but like I mentioned, I was hoping for something Dask related that didn’t have me doing quite so much hand waiving and moving data around manually.
Thanks again for your help!
1 Like
Well, with Dask and by changing a few things in your current code, you could probably speed up your computation by the number of cores and processes you use with Dask. I was suggesting other options because I thought there was other parts that could be optimized. I’m glad you found some ways to do it!
1 Like