Hello! I wanted to ask because I’m probably missing some optimization.
I do the following:
# Read in the data.
df = dd.read_parquet(... calculate_divisions=True, index='Time')
# I need to groupby('ProductCode').resample('1s') from sub-1second data within each group
# of Product. It's been OOMing a lot so if I write it like this then it's not.
resampled = dd.from_pandas(pd.DataFrame(), npartitions=df.npartitions)
for key in df['ProductCode'].unique():
per_product = df[df['ProductCode'] == key].reset_index().set_index(['Time'], sort=True).resample('1s').agg({
'ProductCode': 'first',
'a': 'first',
'b': 'max',
'c': 'min',
'd': 'last',
'e': 'sum',
'f': 'sum',
'g': 'max',
'h': 'sum',
'i': 'sum'}).dropna()
distributed.print('Resampled {}'.format(key))
resampled = dd.multi.concat([resampled, per_product])
# Now write out the results
resampled.to_parquet(..., overwrite=True)
My source data takes 20ish Gb in parquet (2172 files * 8mb each) on disk. My machine has 12 cores and 128 Gbs RAM.
I’ve been running this as distributed.Client(distributed.LocalCluster(memory_limit='20GB', n_workers=5))
and it crashes in the writing out part with “kernel is restarting”, which I take means out of memory. Same with memory_limit='20GB', n_workers=5, threads_per_worker=2
. I now downgraded it to memory_limit='50GB', n_workers=2, threads_per_worker=1
and it is slowly writing out the results in the last step while taking 80gb RAM.
I added reset_index().set_index
before resample
because otherwise it was getting stuck on resample
. I now suspect that my graph is big and maybe it sometimes takes time (like 20mins+) to start the computation.
Is there a better strategy to accomplish this than to reduce the amount of workers to be able to give them more RAM? Should I be persist()
-ing stuff?
Finally, does the last to_parquet
step takes so much memory because it is redoing the whole computation starting with resample
or even read_parquet
?
This is the status page during to_parquet
with two workers.