How to make Dask Asynchronous work with http requests

Hi all,

I am using Dask to make a large number of database queries via http, and then parsing and joining the results together.
My current setup uses a blocking local client and works, but when the number of queries is very large I can see from the dashboard that a thread makes only one request at a time and waits for the result.
I am new to concurrent programming but I have the impression that making the http requests asynchronous (with asyncio or tornado.httpclient) would allow me to make all the requests at the same time and not blocking a thread waiting for the server to respond. Is this the case?

What would be a minimal example of a script that makes a large number of http requests and then some basic operations on the results (without awaiting all responses)?

I have a minimal working example (I substituted the http endpoint of my database with an exchange API just to show it), how would I adapt it to be asynchronous?
Thanks a lot in advance!

import requests
import numpy as np
import pandas as pd
from dask.distributed import Client, LocalCluster, as_completed
import dask.dataframe as dd
import time


def fetch_html(pair):
    req_string = 'https://www.bitstamp.net/api/v2/order_book/{currency_pair}/'
    time.sleep(1)
    response = requests.get(req_string.format(currency_pair=pair))
    try:
        result = response.json()
        return result
    except Exception as e:
        print('Error: {}\nMessage: {}'.format(e,response.reason))
        return None


def parse_result(result):
    if result:
        data = {}
        data['prices'] = [e[0] for e in result['bids']]
        data['vols'] = [e[1] for e in result['bids']]
        data['index'] = [result['timestamp'] for i in data['prices']]
        df = pd.DataFrame.from_dict(data).set_index('index')
        return df
    else:
        return pd.DataFrame()


def other_calcs(result):
    if not result.empty:
        # something
        return result
    else:
        return pd.DataFrame()


def aggregator(res1, res2):
    if (not res1.empty) and (not res2.empty):
        # something
        return res1
    elif not res2.empty:
        # something
        return res2
    elif not res1.empty:
        return res1
    else:
        return pd.DataFrame()


if __name__=='__main__':
    pairs = [ 
        'btcusd',
        'btceur',
        'btcgbp',
        'bateur',
        'batbtc',
        'umausd',
        'xrpusdt',
        'eurteur',
        'eurtusd',
        'manausd',
        'sandeur',
        'storjusd',
        'storjeur',
        'adausd',
        'adaeur',
        # etc, a lot more than available threads
             ]

  cluster = LocalCluster(n_workers=2, threads_per_worker=2) 
  client = Client(cluster)
  futures_list = client.map(fetch_html, pairs)
  futures_list = client.map(parse_result, futures_list)
  futures_list = client.map(other_calcs, futures_list) 
  seq = as_completed(futures_list)
  while seq.count() > 1:
      f1 = next(seq)
      f2 = next(seq)
      new = client.submit(aggregator, f1, f2, priority=1)
      seq.add(new)
  final = next(seq)
  final = final.result()
  print(final.head())

Is the lack of answer because I am asking a wrong question? Can someone clarify?

@dDd Welcome! And, apologies for the delay in response.

The parallelism niches for Dask and asyncio are different. Dask is great for parallel compute heavy workloads and asyncio is great for parallel I/O (like your HTTP requests). So, we would recommend fetching the files outside of Dask and then using Dask for the computations.

Usually, when we use async functions, we want to run something else simultaneously. In your example, this would mean you’re doing some other operation while the URLs are being fetched. However, all the other functions depend on fetch_html, which is another reason why fetching asynchronously outside of Dask might be a good idea. :slight_smile:

If you still do want to use asyncio with Dask for the fetching operation, you can do something like:

async def fetch_html(list_urls):
async def f(url):
    # simulates fecth
    await asyncio.sleep(1)
    return 1
results = await asyncio.gather(*[f(u) for u in list_urls])
return results

%%time # Wall time: 1.04 s
futures = client.submit(fetch_html, [1, 2, 3])
client.gather(futures)

Note that asyncio is single threaded, and we’re running it on all the threads. So, we pass a list of URLs to fetch_html – each thread fetches the URLs passed to it in an async fashion.

Hope this helps!

Thanks for the answer!
The reason to use asyncio is that requests can take a long (and uneven) time to be answered, so I might as well do some post-processing (that is embarassingly parallel) on the result.

1 Like