I have a custom graphs with a lot of nodes to calc.
But I have a limited memory like 1T, 2T.
It returns memory limited error everytime when I run the tasks.
Is there a common way to solve this problem--------calc a big graph with limited memory fastly.
I have tried to partition the big graph into small graphs, but a lot of nodes will be calced a lot of times if you don`t divided the graph good.
If the solution of this problem is parttion, how could I divide the graph suitably.
If not , how could I solve this problem.
@Asuka Thanks for your question! Could you please share a minimal, reproducible example and some more details about your workflow & system? Itβll allow us to help you better!
my codes is too long, I would like to make up a instance.
Here is a dask graph:
dsk = { βa1β : pd.DataFrame, βa2β : (add , βa1β, pd.DataFrame), βa3β : (add, βa2β, pd.DataFrame), (βa4β : (add, βa3β, βa2β) β¦ large tasks to βaiβ }
I init dask distributed with 1T memory and 20 cpus in single machine.
I need the all the a1 a2 β¦ ai.
so I run the graph with get(dsk, [a1, a2β¦ ai])
when dask schedule the graph, this happened:
a1 is done, but a1 has downstreams, so a1 is still in memory, not clean
a2 is done, but a1 a2 has downsteams, so a1 a2 is stll in memory, not clean
a3 is done, and no downstreams, clean a3. a1 a2 is still in memory.
so here is {ai β¦ aj} in memory,and need a new one put in to memory, then return a error, no space left on device.
I want to know how to solve this problem in a common way.
@Asuka Thanks for the details! I just want to confirm, which Dask scheduler are you using here? I recall you were using the dask-on-ray scheduler last time, if thatβs still the case, Iβd encourage you to move this question to the Ray Community Discourse. But if youβre using the distributed scheduler, Iβll look into reproducing this!
Thank you @pavithraes !
I have tried both distributed and dask-on-ray scheduler,there is same problem.
I would use distributed scheduler if it solved this problem, but there is.
@Asuka Iβm not able to reproduce this, but I can keep looking into it. Itβll also be helpful if you can share a locally-executable code example that shows this behavior.
Completed results are usually cleared from memory as quickly as possible in order to make room for more computation. The result of a task is kept in memory if either of the following conditions hold:
A client holds a future pointing to this task. The data should stay in RAM so that the client can gather the data on demand.
The task is necessary for ongoing computations that are working to produce the final results pointed to by futures. These tasks will be removed once no ongoing tasks require them.
In this case, my graph is big and complex which has many nodes like a1,a2β¦ai as I memtioned.
If ai has future outgoing so ai keep in memory.When too many ai(a1 a2β¦ak) have future outgoing and keep in memory,so the memory run out.
What will distributed scheduler do when this happen? would it return a memory error?
I tested it more time,it shows:
distributed.nanny - WARING - Worker exceeded 95% memory budget. Restarting.
It will restart over and over again.
Are these elementwise operations among dataframes with the same number of rows? In other words, can your problem be reformulated as embarrassingly parallel on the row axis? In that case, you should use dask.dataframe instead of pandas.DataFrame and break the data into chunks.
where each element is 1/4th of the rows of yours. Since dask goes depth-first exactly for memory reasons, none of the elements of (, 1), (, 2), or (, 3) will be even started until the whole (, 0) tree is almost completed and your peak ram requirement will be reduced by a factor of 4. Increase the number of chunks as needed.
Ultimately, if thereβs a specific feature of pandas.Dataframe that is missing in dask.dataframe and is a dealbreaker for you, we would like to hear about it - please file a ticket on the dask github issue tracker.
Finally, you may also try xarray.Dataset with a dask backend.