Hi, I am trying to read MongoDB data for the last 1 year into a Dask Bag and subsequently into a Dask DataFrame using dask-mongo. Here’s my code.
def process(self,record):
try:
yield {
"Login": record["Login"],
"PriceOpen": record["PriceOpen"],
"Symbol": record["Symbol"],
"VolumeExt":record["VolumeExt"],
"Date": record["Date"],
"TimeCreate": record["TimeCreate"],
"PositionId": record["PositionId"]
}
except KeyError:
pass
def process_data(self,df):
df['VolumeExt'] = df['VolumeExt'] / 10000000
df['Value'] = df['VolumeExt'] * df['PriceOpen']
df['Date_Convert'] = dd.to_datetime(df['Date'], unit='s', origin='unix', utc=True)
df['TimeOfTrade']=dd.to_datetime(df['TimeCreate'], unit='s', origin='unix', utc=True).dt.time
df=df.drop(['VolumeExt','PriceOpen','Date','TimeCreate'],axis=1)
return df
def read_mongodb_data(self):
'''
Reading Training Data from MongoDB using Dask-Mongo
'''
cluster=LocalCluster(n_workers=5)
client = Client(cluster)
#time.sleep(2)
# Replace this for your URI connection
host_uri = self.config["MONGO-DB-LIVE"]["CONNECTION-STRING"]
#Query for last one year
end_date=datetime(datetime.now().year,datetime.now().month,datetime.now().day,0,0,tzinfo=timezone.utc).timestamp()
back_date=datetime.now() - timedelta(days=365)
start_date=datetime(back_date.year,back_date.month,back_date.day,0,0,tzinfo=timezone.utc).timestamp()
query = {"Date": {"$gte": start_date,"$lt": end_date}}
dask_bag = read_mongo(
connection_kwargs={"host": host_uri},
database=self.config["MONGO-DB-LIVE"]["DB-NAME"],
collection=self.config["MONGO-DB-LIVE"]["POSITION-COLLECTION-NAME"],
chunksize=5000,
match=query
)
#Getting records into format for converting into dask dataframe
dask_bag_flattened = (dask_bag.map(self.process).flatten())
#Converting Dask Bag to Dask Dataframe
dask_frame = dask_bag_flattened.to_dataframe()
return dask_frame
raw_trades = self.read_mongodb_data() #10-10-2023
print('Read Data from DB..')
dask_frame = raw_trades.map_partitions(self.process_data)
all_positions=dask_frame.compute() #All records in a single Pandas Dataframe
all_positions = all_positions.drop_duplicates()
print('Data fetching and pre-processing complete.')
On executing the Python script, It gives me the following warning and takes a long time (Over 5 mins) to execute. Is there any other way to speed it up?When I read the whole DB without a query, The process is still comparitively faster than reading it with a query for just one year’s records.