If you want to limit the memory used by Dask, and be able to process data bigger than this memory in a streaming strategy, you’ll need to use a Distributed cluster (e.g. LocalCluster if on a single machine), and have data loaded with partitions.
It seems you already tried to use Client API, but did not managed to use it. Using Client only is just a shortcut to using LocalCluster, so you should be able to go with it. As soon as a Client is created into your main script, Dask will use it by default to launch computations, you shouldn’t have anything more to do.
So before your computation, you should do something as:
client = Client(memory_limit='25GB', n_workers=1)
Note that you could use several worker processes, but memory_limit * n_workers will be the total amount of memory used by Dask.
It would also help if you could show the current state of your code using Dask.
Hi, thanks for the replies…
So, I actually used dask-mongo to load my data from mongo, apparent it load the collection like a Dask’s bag, so i manipulate it on my script to do the joins.
This is my first time using Dask, so maybe I’m doing something wrong. Maybe showing my full code you can help me fix my logic:
I don’t know if this is the best way to do it, but was the only way that i find to load from mongo without losing all my memory.
My last line, was to add this rf_final to a different collection in mongo. And that is the problem, doesn’t matter which approach i take, somehow all memory is used.
The code looks OK to me. You want to be careful with chunksize, to avoid generating too many partitions (and a really complex taks graph for Dask).
What is this last line? As you’ve declared a Client above, the memory used by your processing should never exceed 8GB, or else it should raise some MemoryError.
I see two things to investigate here:
Try to follow the computation you’re launching on Dask Dashboard. You can get its address with your client object (if on local machine, default to 127.0.0.1:8787).
How is your tasks graph looking? How many tasks has been generated by your workflow?