Strategy for processing geo data

Hi!
I have a problem that Dask seems a good fit to solve, but I’m having trouble figuring out how to go about setting up a solution.

I’ll try to explain the problem, with out getting into the weeds too deep. I have two sets of geographic point data that I would like to compare and interpolation values from one to the other.
The first data set is a DataFrame (dst_df) representing a regular grid that is roughly 4000x8000 points across the globe and the second is a DataFrame (src_df) of 5,000,000 random points spread across the globe that have tier, category, and value assigned.

For every point in the regular grid I perform an inverse distance weighted calculation of the random point values to find the value at the regular grid point.

Is Dask the right tool for this?
What I’m doing ends up gobbling up all my RAM. I can break it up into smaller groups of futures, but it still takes a very long time. I’m not sure what my approach should be. Does anyone have any suggestions?

client = Client()

futures = []
for point in dst_df.itertuples():
    distances = client.submit(distance_to_point, point, src_df, )
    close_distances = client.submit(get_close_distances, distances, src_df, )
    point_value = client.submit(get_point_value, close_distances, src_df, )
    futures.append(point_value)
results = client.gather(futures)

Thanks!

Hi @5Nomads, welcome to Dask community!

From what I understand, you are submitting 3 tasks for each of your 5 millions random points, so 15 millions tasks at the end?

How long last each of these tasks?

A few suggestions I can think of:

  • Did you try using Pandas only? What was the outcome?
  • You should probably scatter the src_df dataframe ahead of time, so it is not sent for every task, how big it is, something like 120MB if float32?
  • I would also just submit one method per point, I don’t think you need to submit three different tasks.
  • And finally, it would probably be more efficient to read you random points into a Dask Dataframe, and wrap all this code in a map_partitions call, and to work with Pandas in each of the partition, especially if the computation on a given point is really short.

Thank you for the reply and suggestions. It’s actually worse, I am submitting 3 tasks for each of the 32,000,000 points in my grid. So 96-million total tasks.
I did try Pandas, it works well, but very slow. Estimated time to completion was going to be well over 40 days.

Simplifying to a single task, instead of 3, sounds like a good idea. I’ll try that.

Ultimately I need to get back an array that has 32,000,000 million values that came from interpolating the values from the 3 closest points from each tier/category in the src_df.

Each of the 32-million points has to be compared to all 5-million+ points in the src_df.

The regular grid (dst) size is:

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 32060028 entries, 0 to 32060027
Data columns (total 5 columns):
 #   Column     Dtype  
---  ------     -----  
 0   point_id   int64  
 1   lat        float64
 2   lon        float64
 3   elevation  int64  
 4   value      float64
dtypes: float64(3), int64(2)
memory usage: 1.2 GB

and the random points (src):

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1090799 entries, 0 to 1090798
Data columns (total 6 columns):
 #   Column     Non-Null Count    Dtype   
---  ------     --------------    -----   
 0   category  1090799 non-null  category
 1   latitude   1090799 non-null  float32 
 2   longitude  1090799 non-null  float32 
 3   elevation  1090799 non-null  float32 
 4   value      1090799 non-null  float32 
 5   tier       1090799 non-null  int64   
dtypes: category(1), float32(4), int64(1)
memory usage: 26.0 MB

I’m no expert in this field, but there are probably other ways to interpolate input point on a regular grid. See for example PyInterp.

32 millions tasks is quite huge already. You’ll probably have to batch the calls or use map_partitions if you stick to that way.

That makes sense. I’ll definitely check out that PyInterp project.

I have a different solution that’s actually pretty fast, I was just hoping to utilize Dask and AWS’s EMR to make it a little more off the shelf.
Right now I end up using Geohashes (mentioned in the PyInterp project) to slice the globe into chunks or regions based on the geohash values and interpolation that way. This dramatically reduces calculation time, but like I mentioned, I was hoping for something Dask related that didn’t have me doing quite so much hand waiving and moving data around manually.

Thanks again for your help!

1 Like

Well, with Dask and by changing a few things in your current code, you could probably speed up your computation by the number of cores and processes you use with Dask. I was suggesting other options because I thought there was other parts that could be optimized. I’m glad you found some ways to do it!

1 Like