Using Dask Mango to read into Dask Dataframe gives Runtime Memory Warning

Hi, I am trying to read MongoDB data for the last 1 year into a Dask Bag and subsequently into a Dask DataFrame using dask-mongo. Here’s my code.

def process(self,record):
        try:
            yield {
            "Login": record["Login"],
            "PriceOpen": record["PriceOpen"],
            "Symbol": record["Symbol"],
            "VolumeExt":record["VolumeExt"],
            "Date": record["Date"],
            "TimeCreate": record["TimeCreate"],
            "PositionId": record["PositionId"]
            }
        except KeyError:
            pass

    def process_data(self,df):
        df['VolumeExt'] = df['VolumeExt'] / 10000000  
        df['Value'] = df['VolumeExt'] * df['PriceOpen']
        df['Date_Convert'] = dd.to_datetime(df['Date'], unit='s', origin='unix', utc=True)
        df['TimeOfTrade']=dd.to_datetime(df['TimeCreate'], unit='s', origin='unix', utc=True).dt.time
        df=df.drop(['VolumeExt','PriceOpen','Date','TimeCreate'],axis=1)
        return df

def read_mongodb_data(self):
        '''
        Reading Training Data from MongoDB using Dask-Mongo
        '''
            cluster=LocalCluster(n_workers=5)
            client = Client(cluster)
            #time.sleep(2)
            # Replace this for your URI connection
            host_uri = self.config["MONGO-DB-LIVE"]["CONNECTION-STRING"]

            #Query for last one year
            end_date=datetime(datetime.now().year,datetime.now().month,datetime.now().day,0,0,tzinfo=timezone.utc).timestamp()
            back_date=datetime.now() - timedelta(days=365)
            start_date=datetime(back_date.year,back_date.month,back_date.day,0,0,tzinfo=timezone.utc).timestamp()
            query = {"Date": {"$gte": start_date,"$lt": end_date}}

            dask_bag = read_mongo(
                    connection_kwargs={"host": host_uri},
                    database=self.config["MONGO-DB-LIVE"]["DB-NAME"],
                    collection=self.config["MONGO-DB-LIVE"]["POSITION-COLLECTION-NAME"],
                    chunksize=5000,
                    match=query
            )
            #Getting records into format for converting into dask dataframe
            dask_bag_flattened = (dask_bag.map(self.process).flatten())

            #Converting Dask Bag to Dask Dataframe
            dask_frame = dask_bag_flattened.to_dataframe()
            return dask_frame
        raw_trades = self.read_mongodb_data() #10-10-2023
        print('Read Data from DB..')
        dask_frame = raw_trades.map_partitions(self.process_data)
        all_positions=dask_frame.compute() #All records in a single Pandas Dataframe
        all_positions = all_positions.drop_duplicates()        
        print('Data fetching and pre-processing complete.')

On executing the Python script, It gives me the following warning and takes a long time (Over 5 mins) to execute. Is there any other way to speed it up?When I read the whole DB without a query, The process is still comparitively faster than reading it with a query for just one year’s records.

Hi @n-srinidhi, welcome to Dask community!

First, what’s inside the read_mongo function?

Then, did you have a look at the Dask Dashboard to have some insights of what might be going on? Do you generate a lot of really small tasks?

Could you elaborate on what do you mean by that?

If your records all fit in memory, do you need to use Dask?

Hi. The read_mongo function is reading from a MongoDB data base using dask_mongo. I process it and save it into a Dask Data Frame to be preocessed further.

When I read the whole DB without a query, The process is still comparitively faster than reading it with a query for just one year’s records

Here, I meant I have another script with the exact code but its reading the entire collection with around 5 million records. It takes some time but executes.
In this case, its taking time and there seems to be a memory leak.

all_positions=dask_frame.compute()

I’m using this to condense all fetched records into a single Pandas Dataframe. Since we are reading over a million records, Dask is the best way I could do so without running out of memory while reading from MongoDB.

The order of execution is as follows:

  1. Read records from a MongoDB database using DaskMongo.
  2. dask_frame = raw_trades.map_partitions(self.process_data)
    This function pre-processes the data read from MongoDB. I’m using map_partitions hoping to optimize the same dask dataframe.
  3. all_positions=dask_frame.compute()
    Convert Dask Dataframe into a Pandas Dataframe and process further

Then, did you have a look at the Dask Dashboard to have some insights of what might be going on? Do you generate a lot of really small tasks?

While executing in a jupyter notebook, this executes smoothly. But when i put it in a python script, Im not able to visualize the graphs and said issues occur.

This is weird, looking at dask_mongo code, I can’t see why it would be using more memory with a query, but I’m no Mongo expert. I can understand that the fetching is comparatively faster without a query, even if a database like Mongo should optimize this.

Looking at your provided code, I don’t see any filtering of data when using Dask, or maybe when selecting the record attributes in process? Documents have more attributes than that? What I mean is, I find it weird to use Dask if all the documents are fitting in memory.

This is really weird, The exact same code, with the same number of processes in the LocalCluster, the same chunksize, on the same machine is resulting in different behaviors between notebook and simple Python process? Anyway, you should be able to print the Dashboard address in the script and try to look at what is happening.