How to configure Dask cluster based on my workload?

Reading in stackoverflow a response from Matthew Rocklin with regards how to configure workers and threads, he mentioned it depends on the workload.

What I am trying to speed up with Dask is the data collection of files that are available to us via REST API layer. Basically I have a pandas DataFrame that contains the location of a file in each row and doing a dask DataFrame .apply() calling a function that performs the requests, unzip the file in memory and extracts what is needed from the file, returning a pandas dataframe as result.
My compute node has 12 CPU cores.

My expectation is to launch as much parallel tasks as possible to retrieve files. How to know what could be the Dask cluster limit? or should I reconsider the usage of dask dataframe .apply() approach?

Hi @lgonzalezsa,

Considering your use case, I’d say that using process or threads won’t have an impact. Since calling an external address to download a file is an IO call, one request should not block others in multithreading mode.

Anyway, Dask won’t probably be the bottleneck here. There are several things to take into consideration: the size of your input data frame (how many rest API call will you perform?), The size of individually downloaded files, the overall bandwidth, the performance of the server your contacting (you could easily perform an unwanted DDoS attack on it with Dask).

I’d say if you have twelve core available, you should:

  • make sure that you divide your input data frame in a least 12 chunks when reading it,
  • start a local cluster with 12 threads.
  • if you observe some sequential processing, just retry with 12 processes.

This should download and process 12 files in parallel. You can always split things more and launch more threads or process if you feel your hardware is underused.

1 Like

Thank you for your response @guillaumeeb, let me describe what am doing so far:

  1. Partition my input pandas dataframe and converting it in a Dask dataframe, typically this input dataframe could vary from 0.1K to 100K records, then my partition size starts in 100.
  2. I was experimenting with the Dask cluster definition finding better performance with 12 workers and 12 threads per worker. I was executing same workload with a variety of workers (4, 8,12,24) and threads (4,8 and 12) to find the “best config”
  3. Doing extra logging to analyze concurrency on my custom function that downloads, opens and parse the data, find the function from dask.distributed.get_worker.log_event() and then able to monitor concurrency of the internal functions of the function am calling during .apply(), so far not able to see more than 45 concurrent tasks in same second during the execution of the workload.

With this information, the Dask diagnostic report and the Dask dashboard itself focus my attention to bandwidth (~30 megabits/s) and CPU usage (in average not more than 15%); looking for opportunities to improve but certainly with no intention to do a DDoS to the host providing REST interface but trying to identify healthy threshold to maximize performance.

Appreciate any other recommendation on how to speed up the execution of the workload described.

From what you say, I suppose that the bottleneck here is the host providing the REST interface. If you increase the number of workers/threads and still find that you are not achieving more than 45 concurrent tasks, this is probably the reason, especially if your local server CPU usage is low. Your code is just waiting for the answer/download of the remote server. You’ll always get some latency from it, so you probably won’t get better performances.

But be aware that all I’m saying is just a guess :smile: !

1 Like