TypeError on da.argmax when executing compute

import dask.array as da
import numpy as np
import pandas as pd

size = 10000
x_vals = da.linspace(0, 1, 1000)
test = da.random.uniform(low=0, high=1, size=(size,4,1000), chunks=(size / 10, 1, 1000))

def simple_convolve(x, y):
    temp_lst = []
    for i in range(x.shape[0]):
        a = da.fft.rfft(x[i])
        b = da.fft.rfft(y[i])
        conv_res = da.fft.irfft(a * b, n = size)
        temp_lst.append(conv_res)
    res = da.stack(temp_lst, axis=0)
    return res

res = da.map_blocks(simple_convolve, test[:,0], test[:,1], dtype='float32')

da.argmax(res, axis=1).compute()

The last line gives error:

TypeError: '>=' not supported between instances of 'str' and 'int'

Since the error is saying I’m comparing a string and an integer, I checked that res has no nulls and no infinity values:

da.isnan(res).sum().compute().compute()
0

(~da.isfinite(res)).sum().compute().compute()
0

Also checked mins and maxes of all timestamps and nothing strange there

min_ = da.min(res, axis=1).compute().compute()
pd.Series(min_).describe()

max_ = da.max(res, axis=1).compute().compute()
pd.Series(max_).describe()

Converting res to numpy array and then running np argmax works fine.

np.argmax(res.compute().compute(), axis=1)

Possibly a bug?

Hi @velosipednikov, welcome to Dask discourse!

So I tried you reproducer, and it indeed gave me the same error.

However, I noticed one thing: the function you’re giving into map_blocks is returning Dask arrays. Which means you build a Dask arrays with Dask arrays as chunk, it is unusual, and seems to be the cause of your issue.

A function into map_blocks takes a Numpy array in input, and usually returns a Numpy array.

So you shouldn’t use da. calls inside it. As I’m not sure about the arithmetic behind all that, I just modified the function to return a Numpy array as so:

def simple_convolve(x, y):
   temp_lst = []
   for i in range(x.shape[0]):
       a = da.fft.rfft(x[i])
       b = da.fft.rfft(y[i])
       conv_res = da.fft.irfft(a * b, n = size)
       temp_lst.append(conv_res)
   res = da.stack(temp_lst, axis=0)
   return res.compute() #return a Numpy array

Then, everything works fine.

Thanks for the welcome and reply!

Ok, interesting, that likely explains why I had to run .compute() twice in order to get the actual data.

However, now I’m running compute on every group that I feed in. Arguably in this case this doesn’t happen that many times (just 7 groups), but I was under the impression that it’s better to leave 1 compute at the end if possible. That was the motivation behind how I did it.

I must say that understanding this for a newcomer seems to require trial-and-error. Is there a good explanation of what happens when we run compute - it seems like a black box?

Also, you wrote map_blocks takes a np array as input, but API says input is ‘dask array or other object’.

I’m not being clear enough, you shouldn’t use compute inside the simple_convolve function as I did, you should try to work only with Numpy inside it.

You are right, you’ll need the compute in the end to trigger the map_blocks and following computation. Dask will then apply your function on each chunk of your input data. Without it, nothing will happen.

This one above is not so bad, but you have plenty of other resources in the documentation.

Yup sorry, my mistaking, I didn’t put it the right way. the function you are giving as an argument inside map_blocks should take as input and output a Numpy Array.

I’m not following, how would I work with only Numpy within the simple_convolve function?
At the point where map_blocks is called, I have 2 dask arrays that are 10K by 1K that are chunked into 100 by 1K chunks. They are passed to simple_convolve using map_blocks.

Whether they are now numpy or dask arrays within simple_convolve, I don’t know.
Are you proposing that I manually convert them to numpy arrays in there? Which also ostensibly means I shouldn’t use Dask’s fft functions for the convolution?

The chunks are Numpy arrays. Inside map_blocks, your function is applied to each of these Numpy Array chunks.

Well, inside this function, they are Numpy arrays.

As you are working with chunk, you shouldn’t have to use Dask’s fft functions in there.
I don’t really understand the science behind all that. But you either want to use Dask’s function on the whole Dask Array, or Numpy function from within a function called inside map_blocks.

This is very helpful!

Numpy’s version of convolve is slower than this fft approach, so I want to call another library’s convolve function. In order to do that, do I need to somehow manually pass this library to each worker if I run my flow after initiating a Client or would the workers have access to all the libraries I load before the distribution of tasks?

In a LocalCluster setup (which is called under the hood if you use Client constructor), you shouldn’t have any problem as all is started from the same Python environment.

There are some exceptions when using multiprocessing or distributed clusters, when some objects from an underlying library is not serializable.

Just try with an import in your main code, and if it doesn’t work, do the import call inside the function your using as a map_blocks argument.

The problem is that I get an error using that particular library (PYFFTW) that’s hard to trace, and so I’m trying to understand whether that’s because the workers aren’t seeing the library or for another reason.

I have this at the top of my code:
client = Client(n_workers=4, threads_per_worker=1)

Could you share a minimum reproducer for this particular problem (encapsulating the call to PYFFTW inside a client.submit call for example).

It works with PYFFTW so whatever error I had before was because of my errors in syntax / logic. Thanks for your help!

1 Like