Memory Issues while load MongoDB big collection

Afternoon… I’m trying to use Dask instead os Spark on my script. But i’m having problem with my calculations using Dask.

I have three big collections on a MongoDB, and I’m trying to left join these collections. These 3 collections combine are aprox 55Gb.

So what I’m trying to do is:

  1. Load these 3 collections individually from Mongo, but just getting 4 columns from it and limiting it to use 25Gb max from all my ram memory;
  2. Left join these 3 collections on the same column (for ex: on column “THIS_COLUMN”)
  3. Insert this new collection back to my Mongo DB;

Issues I’m having:

  1. I can’t limit the memory usage. It’s using all my memory, and can’t allow it because i have multiple process working on the same computer;
  2. Load the collections with this limit;
  3. Create a client with memory limitation, but i couldn’t find a way to use this client to load my collections;

What i tryed and failed:

  1. Load it with partition. It still uses all my memory;

Can someone help me? I will leave my Spark logic:

   coll1 = SPARK.read.format("mongo").option("uri",f'{URI}').load()\
        .select("THIS_COLUMN", "COLL1", "COLL2")  

    coll2 = SPARK.read.format("mongo").option("uri",f'{URI}').load()\
        .select("THIS_COLUMN", "COLL3", "COLL4", "COLL5")
    
    coll3 = SPARK.read.format("mongo").option("uri",f'{URI}').load()\
        .select("THIS_COLUMN","COLL6","COLL7")    

    final_coll = coll2\
        .join(coll1,"THIS_COLUMN", how = 'left')\
        .join(coll3,"THIS_COLUMN", how = 'left')

Hi @lefigueiredo, welcome to Dask Discourse!

If you want to limit the memory used by Dask, and be able to process data bigger than this memory in a streaming strategy, you’ll need to use a Distributed cluster (e.g. LocalCluster if on a single machine), and have data loaded with partitions.

It seems you already tried to use Client API, but did not managed to use it. Using Client only is just a shortcut to using LocalCluster, so you should be able to go with it. As soon as a Client is created into your main script, Dask will use it by default to launch computations, you shouldn’t have anything more to do.

So before your computation, you should do something as:

client = Client(memory_limit='25GB', n_workers=1)

Note that you could use several worker processes, but memory_limit * n_workers will be the total amount of memory used by Dask.

It would also help if you could show the current state of your code using Dask.

Hi there, maybe you find dask-mongo useful GitHub - coiled/dask-mongo

Hi, thanks for the replies…
So, I actually used dask-mongo to load my data from mongo, apparent it load the collection like a Dask’s bag, so i manipulate it on my script to do the joins.
This is my first time using Dask, so maybe I’m doing something wrong. Maybe showing my full code you can help me fix my logic:

    from dask_mongo import read_mongo
    from dask.distributed import Client
    import numpy as np

    client = Client(memory_limit='8GB', n_workers=1)
    host_uri = "mongodb://localhost:27017/TEST_DF?retryWrites=true&w=majority"

    bag_emp = read_mongo(
        connection_kwargs={"host": host_uri},
        database="RECEITA2",
        collection="RF_EMP",
        chunksize=500,
    )

    b_estab = read_mongo(
        connection_kwargs={"host": host_uri},
        database="RECEITA2",
        collection="RF_ESTAB",
        chunksize=500,
    )

    b_sim = read_mongo(
        connection_kwargs={"host": host_uri},
        database="RECEITA2",
        collection="RF_SIM",
        chunksize=500,
    )        
    
    rf_emp = b_emp.to_dataframe(columns=['CNP', 'POR', 'NAT'])

    rf_estab = b_estab.to_dataframe(columns=['CNP', 'SIT', 'CNA'])
        
    rf_sim = b_sim.to_dataframe(columns=['CNP','OPC','MEI'])

    rf_final = rf_estab\
        .merge(rf_emp, on='CNP', how='left')\
        .merge(rf_sim, on='CNP', how='left')

    rf_final = rf_final.drop_duplicates()
    rf_final = rf_final.replace({np.nan: None}) # Force null into mongo (spark removed None from registers)

I don’t know if this is the best way to do it, but was the only way that i find to load from mongo without losing all my memory.
My last line, was to add this rf_final to a different collection in mongo. And that is the problem, doesn’t matter which approach i take, somehow all memory is used.

The code looks OK to me. You want to be careful with chunksize, to avoid generating too many partitions (and a really complex taks graph for Dask).

What is this last line? As you’ve declared a Client above, the memory used by your processing should never exceed 8GB, or else it should raise some MemoryError.

I see two things to investigate here:

  1. Try to follow the computation you’re launching on Dask Dashboard. You can get its address with your client object (if on local machine, default to 127.0.0.1:8787).
  2. How is your tasks graph looking? How many tasks has been generated by your workflow?