Hi all,
I am using Dask to make a large number of database queries via http, and then parsing and joining the results together.
My current setup uses a blocking local client and works, but when the number of queries is very large I can see from the dashboard that a thread makes only one request at a time and waits for the result.
I am new to concurrent programming but I have the impression that making the http requests asynchronous (with asyncio or tornado.httpclient) would allow me to make all the requests at the same time and not blocking a thread waiting for the server to respond. Is this the case?
What would be a minimal example of a script that makes a large number of http requests and then some basic operations on the results (without awaiting all responses)?
I have a minimal working example (I substituted the http endpoint of my database with an exchange API just to show it), how would I adapt it to be asynchronous?
Thanks a lot in advance!
import requests
import numpy as np
import pandas as pd
from dask.distributed import Client, LocalCluster, as_completed
import dask.dataframe as dd
import time
def fetch_html(pair):
req_string = 'https://www.bitstamp.net/api/v2/order_book/{currency_pair}/'
time.sleep(1)
response = requests.get(req_string.format(currency_pair=pair))
try:
result = response.json()
return result
except Exception as e:
print('Error: {}\nMessage: {}'.format(e,response.reason))
return None
def parse_result(result):
if result:
data = {}
data['prices'] = [e[0] for e in result['bids']]
data['vols'] = [e[1] for e in result['bids']]
data['index'] = [result['timestamp'] for i in data['prices']]
df = pd.DataFrame.from_dict(data).set_index('index')
return df
else:
return pd.DataFrame()
def other_calcs(result):
if not result.empty:
# something
return result
else:
return pd.DataFrame()
def aggregator(res1, res2):
if (not res1.empty) and (not res2.empty):
# something
return res1
elif not res2.empty:
# something
return res2
elif not res1.empty:
return res1
else:
return pd.DataFrame()
if __name__=='__main__':
pairs = [
'btcusd',
'btceur',
'btcgbp',
'bateur',
'batbtc',
'umausd',
'xrpusdt',
'eurteur',
'eurtusd',
'manausd',
'sandeur',
'storjusd',
'storjeur',
'adausd',
'adaeur',
# etc, a lot more than available threads
]
cluster = LocalCluster(n_workers=2, threads_per_worker=2)
client = Client(cluster)
futures_list = client.map(fetch_html, pairs)
futures_list = client.map(parse_result, futures_list)
futures_list = client.map(other_calcs, futures_list)
seq = as_completed(futures_list)
while seq.count() > 1:
f1 = next(seq)
f2 = next(seq)
new = client.submit(aggregator, f1, f2, priority=1)
seq.add(new)
final = next(seq)
final = final.result()
print(final.head())