Custom Graphs memory limit

I have a custom graphs with a lot of nodes to calc.
But I have a limited memory like 1T, 2T.
It returns memory limited error everytime when I run the tasks.
Is there a common way to solve this problem--------calc a big graph with limited memory fastly.

I have tried to partition the big graph into small graphs, but a lot of nodes will be calced a lot of times if you don`t divided the graph good.

If the solution of this problem is parttion, how could I divide the graph suitably.
If not , how could I solve this problem.

Is there any paper about this?

@Asuka Thanks for your question! Could you please share a minimal, reproducible example and some more details about your workflow & system? It’ll allow us to help you better!

my codes is too long, I would like to make up a instance.

Here is a dask graph:
dsk = { β€˜a1’ : pd.DataFrame, β€˜a2’ : (add , β€˜a1’, pd.DataFrame), β€˜a3’ : (add, β€˜a2’, pd.DataFrame), (β€˜a4’ : (add, β€˜a3’, β€˜a2’) … large tasks to β€˜ai’ }

I init dask distributed with 1T memory and 20 cpus in single machine.
I need the all the a1 a2 … ai.
so I run the graph with get(dsk, [a1, a2… ai])
when dask schedule the graph, this happened:
a1 is done, but a1 has downstreams, so a1 is still in memory, not clean
a2 is done, but a1 a2 has downsteams, so a1 a2 is stll in memory, not clean
a3 is done, and no downstreams, clean a3. a1 a2 is still in memory.
so here is {ai … aj} in memory,and need a new one put in to memory, then return a error, no space left on device.

I want to know how to solve this problem in a common way.

@Asuka Thanks for the details! I just want to confirm, which Dask scheduler are you using here? I recall you were using the dask-on-ray scheduler last time, if that’s still the case, I’d encourage you to move this question to the Ray Community Discourse. But if you’re using the distributed scheduler, I’ll look into reproducing this!

Thank you @pavithraes !
I have tried both distributed and dask-on-ray scheduler,there is same problem.
I would use distributed scheduler if it solved this problem, but there is.

@Asuka I’m not able to reproduce this, but I can keep looking into it. It’ll also be helpful if you can share a locally-executable code example that shows this behavior.

Some of these docs might be helpful for you:

Let me know if the links help!

@pavithraes I have read it many times.

distributed memory management:

Completed results are usually cleared from memory as quickly as possible in order to make room for more computation. The result of a task is kept in memory if either of the following conditions hold:

  1. A client holds a future pointing to this task. The data should stay in RAM so that the client can gather the data on demand.
  2. The task is necessary for ongoing computations that are working to produce the final results pointed to by futures. These tasks will be removed once no ongoing tasks require them.

In this case, my graph is big and complex which has many nodes like a1,a2…ai as I memtioned.

If ai has future outgoing so ai keep in memory.When too many ai(a1 a2…ak) have future outgoing and keep in memory,so the memory run out.

What will distributed scheduler do when this happen? would it return a memory error?
I tested it more time,it shows:
distributed.nanny - WARING - Worker exceeded 95% memory budget. Restarting.
It will restart over and over again.

Are these elementwise operations among dataframes with the same number of rows? In other words, can your problem be reformulated as embarrassingly parallel on the row axis? In that case, you should use dask.dataframe instead of pandas.DataFrame and break the data into chunks.

e.g. your graph would become
dsk = {
(β€˜a1’ , 0): pd.DataFrame,
(β€˜a1’ , 1): pd.DataFrame,
(β€˜a1’ , 2): pd.DataFrame,
(β€˜a1’ , 3): pd.DataFrame,
(β€˜a2’, 0): (add , (β€˜a1’, 0), pd.DataFrame),
(β€˜a2’, 1): (add , (β€˜a1’, 1), pd.DataFrame),
(β€˜a2’, 2): (add , (β€˜a1’, 2), pd.DataFrame),
(β€˜a2’, 3): (add , (β€˜a1’, 3), pd.DataFrame),
}

where each element is 1/4th of the rows of yours. Since dask goes depth-first exactly for memory reasons, none of the elements of (, 1), (, 2), or (, 3) will be even started until the whole (, 0) tree is almost completed and your peak ram requirement will be reduced by a factor of 4. Increase the number of chunks as needed.

2 Likes

some api dask.dataframe not support.

Like reindex, groupby,bewteen_time and so on.

is there any way to use pd.DataFrame and avoid this?

For reindex, have you tried looking at this?

groupby is supported.

For between_time, have you tried looking at this?

Ultimately, if there’s a specific feature of pandas.Dataframe that is missing in dask.dataframe and is a dealbreaker for you, we would like to hear about it - please file a ticket on the dask github issue tracker.

Finally, you may also try xarray.Dataset with a dask backend.

2 Likes

@Asuka Just checking in, were you able to get this working?

@pavithraes Thanks!
I have been working on a new project this days.
And I think it works fine.

Thank you and @crusaderky !

2 Likes