How does da.histogram() work?

Hello,

I am trying to find the fastest way to make a 1D histogram and I have come across a strange behaviour of scaling of the function dask.array.histogram(). I tried hists with 50 bins of a random sample of 3e8 numbers for numpy, numba, dask.delayed and dask.array.histogram, this is what I got:

histogram_speeds

My questions are:

  1. Why is dask.array.histogram slower then numpy?
  2. What is the origin of those wierd “jumps” in dask.array.histogram speed?
  3. Is there a way for using dask for parallelization of numpy.histogram?

The code that generated the data:

import numpy as np
import dask 
import dask.array as da
from numba import njit
from time import time

N_samples = 45
Nbins = int(5e1)
Ns = np.linspace(Nbins, 3e8, N_samples)

times = np.zeros((4,N_samples))
for i, N in enumerate(Ns):
    print(i)
    N = int(N)
    x = np.random.sample(N)
    
    @njit
    def hist_numba(x):
        return np.histogram(x, Nbins)
    
    @dask.delayed
    def hist_dask(x):
        return np.histogram(x, Nbins)

    t0 = time()
    np.histogram(x)
    times[0,i] = time() - t0

    t1 = time()
    hist_numba(x)
    times[1,i] = time() - t1

    t2 = time()
    hist_dask(x).compute()
    times[2,i] = time() - t2

    x_d = da.from_array(x)
    t3 = time()
    dask.compute(da.histogram(x_d, Nbins, range = [0,1]))
    times[3,i] = time() - t3

Thanks for your advices.

Hey,

Interesting post, I was intrigued and decided to play with your script. Thanks for the clear and concise example.
While I am not very familiar with the native Dask implementation of the histogram function, I think I can answer some of the questions you posed.

First of all, the delayed implementation is as performant as the Numpy implementation because it is simple calling the Numpy implementation. The difference is that the Numpy implementation is executed right away whereas the delayed implementation is executed when you call compute(). Delayed functions are always single threaded. You will find that the line of the delayed implementation is nearly equivalent to the jitted function if we call delayed on the jitted function. I imagine there is some minor overhead involved in the creation of the delayed object but your data shows this to be negligible.

  1. Honestly, I am unsure why that scaling of the Dask implementation is so much steeper than the scaling of the normal Numpy implementation. Maybe someone else in the community has a thought on this. At first I thought it might be the overhead, but that I would expect to scale with the number tasks or maybe the number of workers, but that does not seem to be the relation here. If anyone has some thoughts on this I would be very interested to read those as well.

  2. I think the jumps might be related to the number of threads. Imagine a function call takes a second. If you have 10 tasks and 10 threads, the parallelized operation will take around one second. If you have 11 tasks and 10 threads, it will take around 2 seconds. There is after all one task that can only be picked up on the second round once a thread finished it’s earlier calculation. 20 tasks will therefore take roughly as long as 11 tasks if you have 10 threads.

This combines with the way Dask chunks the array x_d. When Dask chunks an array, it creates as many chunks as it needs where each chunk is assigned the maximum allowed chunk size, except for the last chunk. For the lower N values, this means x_d will contain only one chunk. This is the rapid rise you see at the start before it caps out. Each of the first few runs with increasing N has only one chunk. That one chunk grows larger and seems to have steep linear scaling*. At some point, when N is a bit larger, the first chunk reaches it’s maximum size and a new chunk is created. Here we reach the first plateau in the graph. At this point when N grows either the last chunk is increased in size or a new chunk is created. Either way the duration is unaffected since the first maxed out chunk acts as the bottleneck. This cap persists until there are more chunks than available threads to pick up the work. At this point the new rapid increase happens. Rinse and repeat.

Since the maximum chunk size determines the size of the step in the graph, I ran your script with x_d = da.from_array(x, chunks=1e7) which resulted in smaller maximum chunks than were determined by cunk='auto' on my machine. The smaller maximum chunk size resulted in more, smaller steps (as expected):
chunksize_6e6
Decreasing the chunksize even more to 1e6 resulted in slightly overall better performance than the Numpy implementation:
chunksize_1e6
A single chunk for all sizes of N results in absolutely terrible performance for the Dask implementation (referring to 1., I have no clue why performance of the single threaded Dask implementation is so poor):
chunksize_1e9

There is a tradeoff where the steeper increase is counteracted by the plateau. The length of the plateau scales with the number of available threads. When I run this on a machine with 64 available threads, which is more than the number of tasks we get to even with an N of 1e9, I can create such a long plateau that I can get the Dask performance below even the Numba performance:

  1. Since a Dask implementation of the histogram function already exists, I don’t think writing a custom implementation based on the Numpy histogram function is the way to go. In fact, the Dask implementation uses the Numpy implementation under the hood: dask/routines.py at 2023.5.1 · dask/dask · GitHub
    A custom implementation does not seem worth the time and energy investment to me. I propose you use the native Dask histogram implementation or not use Dask for this.

P.S.
One modification you could make is to use da.random.random_sample(N) instead of

x = np.random.sample(N)
x_d = da.from_array(x)

With the former, you are not limited by the number of samples you can hold in memory. The Dask implementation of random is already chunked. This however is more a scalability matter rather than a performance matter.

Hi @Galedon1, welcome to the Dask community!

Hi @TMillenaar, thanks for the detailed and precise explanation in answer.

So I think that @TMillenaar is right about everything, to sum it up:

  • The delayed implementation is just a lazy Numpy call, hence the same performances.
  • Dask performances are bound to the number of chunks vs the number of available threads, hence the plateau effect.
  • Dask starts to be really interesting when the data is too huge to fit in memory, as demonstrated by @TMillenaar at the end of its post.

The last bullet is a major point to keep in mind, if your data fits in memory, if your computation takes O(1s), just stick with Numpy.

However, I’m also a bit surprised by the results of Dask, especially with one chunk. We could expect some overhead, but the graph produced by @TMillenaar is surprising, why Dask with one chunk is so slower, and linearly slower, than Numpy?

Also, I had a go with the multi-processing local Scheduler, and the distributed Scheduler, but I encountered an exception:

import numpy as np
import dask 
import dask.array as da
from numba import njit
from time import time

x = np.random.sample(int(4e8))
xd = da.from_array(x)
h, bins = da.histogram(xd, 5, range = [0,1])
h.compute(scheduler='processes')

makes my Jupyter kernel crash, but the data is not so big so I’m not sure why.

1 Like

Thanks @guillaumeeb for the kind words.

About the poor linear performance of the Dask implementation, it seems there are two issues here:

First, @Galedon1 the script contains a bug where Nbins is supplied to the Dask histogram call but not to the Numpy histogram call. The Numpy default is 10 and you specified 50 meaning this was not a fair comparison.

That said, numpy.histogram is more performant when specifying bins as an integer as opposed to an array. While I did not look at the Numpy source code to check, I imagine this is because shortcuts can be performed by assuming the bins are spaced equally. Bins that are supplied as an array can have arbitrary spacing. Anyway, Dask converts the bins from integer into an array here:

That is what makes the biggest difference:

import numpy as np
from timeit import timeit

x = np.random.sample(int(4e8))
nbins = 10
bins = np.linspace(0,1,nbins)
timeit(lambda: np.histogram(x, nbins), number=5) # 21.261210839264095
timeit(lambda: np.histogram(x, bins), number=5) # 142.5214459085837

@guillaumeeb What do you think? Is this int → array conversion worth raising as a Dask github issue?

I decided to go ahead an make a pull request, let’s see if that gets accepted

1 Like

Wow, thanks a lot @TMillenaar for the follow up and the opened issue!