I have a large dataset (~2TB) of about 750k particle time series. The data is generated in a simulation and written out in essentially a random order so the data for a given time series is scattered all over the dataset. I need to compute the difference of one column at time t and t-1, then find all particles with a maximum size of that difference over a certain amount. I assume that to do this I need to
- Sort the data by particle ID and time
- Perform (series - series.shift())
- Then write the results to a parquet file
- Search for the value I want
This works for small datasets (~100GB) on a machine with 64GB of memory. But when I go to the full size data (2TB) on a machine with 500GB of memory the job runs for 1-2 hours, gets a heartbeat from an unknown worker, then crashes. It does this every single time I try.
Do you have a suggestion? Ideally I would like to do something like
sorted = df.groupby['particle_id'].sort('time')
delta_mu_max = (sorted - sorted.shift()).max()
particle_of_interest = delta_mu_max[delta_mu_max > value]
but of course that doesn’t actually work.
Hi @bcaddy, welcome back!
Complete shuffle of 2TB datasets is always complex! It should be possible if each particle group is small enough to fit in a Worker memory, is that your case?
First, we would need to have more insights of your current code and Dask scheduler/cluster configuration. The stacktrace from the crash might also be helpful. And some information about your input data layout and format.
Some more things:
- probably not possible to modify now, but it would be much easier if the generation of data was already writing particles into different files…
- Try step by step, what happens if you try to do groupby, sort and write in a partitioned by particle parquet structure?
- Why your pseudo code does not work?
Each particle group is small, 10-15k rows with 18-24 elements each; about 2-3MB total. I don’t actually care that the particle groups are in order, just that all rows for a given particle are grouped together and sorted by time within the group.
The input data is all in parquet files. I have a previous step where I take the raw binary files from the simulation and convert them to parquet. I can do some pre-sorting at this stage if that would be helpful. Each of these parquet files have all the data for all the particles in a given window of time.
There’s another step before the sort where I compute new particle IDs since before then particles are uniquely identified by a combination of 3 numbers which is unwieldy.
Here’s a link to the log file with errors and the python script I’m currently using; it won’t let me upload anything besides image files. The relevant sections of the python program are lines 187-232. I’m running this on a university computing cluster. I’ve only run it on a single node and would prefer to keep it that way but that isn’t a hard requirement, if it would work better with enough nodes that the whole dataset fits in the distributed memory I can do that. I also have access to some large memory nodes that could fit the whole dataset, though those are in high demand and queue times are long on them.
Other things:
- It would be much easier if each particle had it’s own file. The group I’m working with used to do that but since particles can move between MPI ranks in a simulation the end result was thousands of ranks opening hundreds of thousands of files, writing a few KB to them, then closing them every second which was not good for the file system. I think there’s probably a better way to do this in general but it’s well outside the scope of my current project.
- I’m working on testing a groupby based method but it’s not working yet. Yesterday when I tested it I got an error that the object
groupby
returns doesn’t have a sort
method
I tried .groupby('particle_id').get_group((0,))
and it takes almost a minute to run with 4 workers on a 100GB dataset. That’s not fast enough to be scalable up to 2TB and 750k particles.
I ran it for a couple hours and it didn’t finish even on the small dataset. No way that will work for a 2TB dataset.
Hi @bcaddy ,
This is definitely not a trivial challenge and I’m not sure I have the solution, but I have a couple of leads based on my experience.
I believe the relevant snippet of code that you shared in the drive is:
# Sort
shuffle_method = "p2p"
parquet_output = parquet_output.sort_values(
by=["particle_id", "time"], shuffle_method=shuffle_method
)
# Compute delta mu
mu_shift = parquet_output["mu"].shift()
mu_shift = mu_shift.where(
parquet_output["particle_id"].shift() == parquet_output["particle_id"]
)
parquet_output["delta_mu_abs"] = (parquet_output["mu"] - mu_shift).abs()
While what you say you’d like to have is something like:
sorted = df.groupby['particle_id'].sort('time')
delta_mu_max = (sorted - sorted.shift()).max()
particle_of_interest = delta_mu_max[delta_mu_max > value]
My unsorted list of tips/ideas:
- I don’t think you need to sort by particle_id: just sort by time and then when you later select by particle_id, each row group will be sorted correctly. You might also set the index to “time” then. Sorting by 750k particle ids might be quite expensive.
persist()
the data. I have a similar workflow (sorting data, re-assigning IDs to rows by groups, with some operation needing across-rows info as in shift()
) in a smaller dataset, and persisting the data after sorts and other major operations sped up the execution time and reduced memory issues;
- Potential work around: re-partition the data by particle id and operate on each of them separately using
map_partitions()
. You could have something like:
# Get particle ids
particle_ids = parquet_output["particle_id"].drop_duplicates().compute().tolist()
# Repartition such that each partition has all and only rows with one particle id
partitions_by_id = [
parquet_output[ parquet_output["particle_id"] == id ] for id in particle_ids]
]
partitions_by_id = [p.repartition(npartitions=1) for p in partitions_by_id]
partitions_by_id = dd.concat(partitions_by_id)
partitions_by_id = partitions_by_id.persist()
This ensures that each partition contains all and only the rows with a given particle id. While the load per thread/process might be imbalanced depending of the distribution of number of rows per particle id, the advantage is that you can then easily call map_partitions()
to operate in parallel on each partition (i.e. particle id) separately. Note that each partition is loaded into memory as a pandas dataframe during map_partitions()
, but you said each group of rows with a given particle_id is ~2-3MB so it should not be a problem.
I’d even try and perform the sort by time within the map_partitions()
instead of at the whole dask dataframe level.
You can look at the lines above as a groupby where you impose that each group is also a partition, which you probably need as you’re using shift()
, otherwise you’d get inconsistent results if rows with the same particle id are spread across more than one partition (and thus, threads).
- “Yesterday when I tested it I got an error that the object
groupby
returns doesn’t have a sort
method” – that’s because dask does not have a sort
method, it’s a pandas’ method. What you want might be sort_values()
;
- I’m not familiar with dask’s
where()
but this looks like trouble to me:
mu_shift = mu_shift.where(
parquet_output["particle_id"].shift() == parquet_output["particle_id"]
)
dask’s documentation says that it operates like numpy’s nonzero()
which returns indices, not values, I believe. So in the next line (parquet_output["mu"] - mu_shift
) you’re subtracting indices to measured values to compute the delta? I might be 100% wrong on this.
I hope some of this was helpful! Cheers
- I don’t think you need to sort by particle_id: just sort by time and then when you later select by particle_id, each row group will be sorted correctly. You might also set the index to “time” then. Sorting by 750k particle ids might be quite expensive.
How is sorting by time any different than sorting by particle ID? I’ve tried groupby().get_group() and it was prohibitively slow even on the small dataset I have for testing, would that change if the data was sorted by time?
persist()
the data. I have a similar workflow (sorting data, re-assigning IDs to rows by groups, with some operation needing across-rows info as in shift()
) in a smaller dataset, and persisting the data after sorts and other major operations sped up the execution time and reduced memory issues;
The data doesn’t fit in memory so I can’t use persist()
- Potential work around: re-partition the data by particle id and operate on each of them separately using
map_partitions()
. You could have something like:
I tried this. Even on the small dataset it takes prohibitively long. It’s been running for an hour and a half on the small dataset and hasn’t even started to write any final files. The sorting method worked on the small dataset in ~10 minutes.
I’m trying the sort method on the large dataset with a smaller number of processes, maybe dask just doesn’t like using all 96 cores on the machine.
Well, it would really be helpful if you sorted/grouped records by particle, but not sure how much time it would take… You could try, it might be more efficient than using Dask sort or groupby.
Actually, 750k particles, means 750k groups, and thus potentially 750k tasks at some point which is huge. If you could group first by several particles id this could help.
This clearly looks like memory error, the full shuffle is not working with your dataset and memory.
It would probably help, and might just work with enough memory, but this is not really what Dask is aiming at.
If Dask doesn’t work here, you might have to do this at some point, even if expensive at some point for the file system.
That doesn’t seem that bad for me. I would try a groupby.apply, sort each chunk using pandas (as each chunk input in the function used in apply should be a Pandas dataframe), and then just to_parquet the result to see if it’s correctly streaming data and managing to do the full shuffle.
What did you run?
This is what groupby/apply should do. Problem of 750k partitions remains.
You need to find the correct balance between processes and threads per process here. Too many processes with not enough memory will probably not be efficient for a full shuffle.
The final solution I ended up with was using a mixture of multiprocessing and Polars. Instead of loading all the different files and trying to sort it all at once I did as much of the processing and sorting as I could on a per-file basis (parallelized with multiprocessing and using Polars to do the computation), then I scanned through the sorted data and collected blocks of particles such that an entire block fit into memory. This process was about an order of magnitude faster than the Dask version, though presumably that was mostly because I could make some optimizations that are specific to my data that Dask couldn’t know about. I really tried to get Dask to work but I had constant memory or performance issues no matter what I did.