How to improve the processing speed

Hello, as I need to handle very large matrices, there is a memory shortage error when using numpy. Therefore, I have chosen to use dask.array to store matrices, but I am very unfamiliar with dask.array. When processing data, I found that dask.array is very slow. Is there any way to improve the speed? Thank you very much. Below is my code,the picture is processing time

import BWStest as bws
import numpy as np
from skimage.measure import label
import dask.array
from tqdm import tqdm


def BWS(Xarrar, Yarray, threshold):
  """Simpler but accelerated version of BWStest
    Compute the Baumgartner-weiB-Schindler test statistic for two-sample case.

  Args:
      Xarrar (np.ndarray): A height by width matrix, each column corresponds to a sample for each obs
      Yarray (np.ndarray): A height by width matrix, each column corresponds to a sample for each obs
      threshold (float): Significance level, only .05 (Default) and 0.01 are supported.

  Returns:
    H:    0 => Accept the null hypothesis at significance level ALPHA
          1 => Reject the null hypothesis at significance level ALPHA

    [1] A Nonparametric Test for the General Two-Sample Problem, W. Baumgartner, P. Weiß and H. Schindler
    Biometrics Vol. 54, No. 3 (Sep., 1998), pp. 1129-1135 


    This toolbox can be used only for research purposes, you should cite 
    the aforementioned papers in any resulting publication.

  """

  n, m = Xarrar.shape
  rank = rankdata(np.vstack((Xarrar, Yarray)), axis=0)
  xrank = np.sort(rank[0:n,:], axis=0)
  yrank = np.sort(rank[n::,:], axis=0)
  temp = np.arange(1, n+1)[:,np.newaxis]*np.ones((1,m))
  tempx = (xrank - 2.0*temp) ** 2
  tempy = (yrank - 2.0*temp) ** 2
  temp  = temp/(n+1) * (1-temp/(n+1)) * 2 * n 
  BX    = 1/n * np.sum(tempx / temp, axis=0)
  BY    = 1/n * np.sum(tempy / temp, axis=0)
  # test statistic
  B = 1/2*(BX + BY)
  if threshold ==0.05:
    if n == 5:
        b = 2.533
    elif n == 6:
        b = 2.552
    elif n == 7:
        b = 2.620
    elif n == 8:
        b = 2.564   
    elif n == 9:
        b = 2.575     
    elif n == 10:
        b = 2.583
    else:
        b = 2.493
  else:
    b = 3.880

  H = (B >=b)
  H = np.transpose(H)
  return H

# stack = np.random.rand(1, 12000, 20000)
CalWin = [7,25]
stack = []
thershold = 0.05
for i in range(5):

  image = np.random.rand(12000, 20000)
  dask_image = dask.array.from_array(image, chunks=(2048, 20000*2048))
  stack.append(dask_image)
stacks = dask.array.stack(stack, axis=0)

print(stacks)
npages,nlines,nwidths = np.shape(stacks)
RadiusRow=(CalWin[0] - 1) // 2
RadiusCol=(CalWin[1] -1 ) // 2
InitRow=(CalWin[0] + 1) // 2
InitCol=(CalWin[1] + 1) // 2

mlistack = np.pad(np.abs(stacks), ((0, 0), (RadiusRow, RadiusRow), 
                            (RadiusCol, RadiusCol)), mode='symmetric').compute()
meanmli = np.mean(mlistack, axis=0)
    # print(meanmli.compute())

nlines_EP,nwidths_EP = meanmli.shape
PixelInd = dask.array.zeros((CalWin[0] * CalWin[1],nlines*nwidths),
                                chunks=(CalWin[0] * CalWin[1],2048*nlines), dtype=np.bool)

num = 0
p = 0
all = nlines*nwidths
all_step = np.floor(all/10)
# compar_pixel = np.zeros((CalWin[0] * CalWin[1],nlines*nwidths))
compar_pixel = dask.array.zeros((CalWin[0] * CalWin[1],nlines*nwidths),
                                chunks=(CalWin[0] * CalWin[1],2048*nlines), dtype=np.bool)
  
for kk in tqdm(range(InitCol, nwidths_EP-RadiusCol), desc="Processing columns", ascii=' ='):
    
    for ll in tqdm(range(InitRow, nlines_EP-RadiusRow), desc="Processing rows", leave=False,ascii=' ='):
        
        Matrix = mlistack[:,ll-RadiusRow-1:ll+RadiusRow,kk-RadiusCol-1:kk+RadiusCol]
        # print(Matrix)
        Ref = Matrix[:,InitRow-1,InitCol-1]
        Xarray = np.tile(Ref[:, np.newaxis], (1, CalWin[0] * CalWin[1]))
        Matrix_T = np.transpose(Matrix, (0, 2, 1))
        Yarray = (np.reshape(Matrix_T, (Matrix_T.shape[0], CalWin[0] * CalWin[1])))

        # bws is my own module for BWS algorithm
        T = bws.BWS(Xarray, Yarray, thershold)
        # print(~T)
        SeedPoint = np.transpose(np.reshape(~T, (CalWin[1], CalWin[0])))
      

        ll = np.transpose(SeedPoint)
        l = label(ll,2)
        l_flat = l.flatten() 
        LL = label(SeedPoint, 2)
    
        LL_flat = np.transpose(LL).flatten()
        
        # compar_pixel.compute()
        compar_pixel[:,num] = (LL_flat == LL[InitRow - 1, InitCol - 1])
        PixelInd[:,num] = (LL_flat == LL[InitRow - 1, InitCol - 1])
        
    
        num = num + 1


BroNum = np.sum(PixelInd, 0)

BroNum = np.reshape(BroNum[::], [nlines, nwidths], order='F')

BroNum = np.float32((BroNum-1)) 
  
CalWin = CalWin

image

Hi @kjz1997, welcome to Dask Discourse!

First of all, you code is really complex to give like that and to optimize on a public forum. Could you try to minimize it to the problem you are facing?

I’ll go with a few comments though:

You don’t want to go through Numpy first when building Dask Array, not a good practice in general. Secondly, why are you using a chunk shape with a second dimension bigger than the array shape?

Just below, you’re computing the array, so turning it back to a Numpy one. There was no memory issue then? Building dask array above has probably been useless.

You don’t build Zeros Dask Array to fill them in later, or at least it’s really rare.

You especially don’t fill them by iterating through rows or columns!

Again, it is really hard to help given the example, but I think at some point you should do a map_blocks applyed on mlistack, not iterating.

I’m very sorry for it, I need to process multiple sets of images (animals, rows, columns), and this set of data is particularly large. First, I need to use a sliding window to select the pixels of the images, and then save the indices of the pixels that meet the conditions (the condition algorithm at this time is the BW algorithm I wrote myself), and save their coefficients that is calculated by bws algorithm.

import numpy as np
import dask.array
stack_image1 = dask.array.random.random((5,400, 600), chunks=(1,200,300))
stack_image2 = dask.array.random.random((5,400, 600), chunks=(1,200,300))
stack_image = stack_image1 + 1j*stack_image2
npages,nlines,nwidths = dask.array.shape(stack_image)
RadiusRow=(CalWin[0] - 1) // 2
RadiusCol=(CalWin[1] -1 ) // 2
InitRow=(CalWin[0] + 1) // 2
InitCol=(CalWin[1] + 1) // 2
mlistack = dask.array.abs(stack_image)
meanmli = dask.array.mean(mlistack, axis=0)
# print(mlistack[:,0:7,0:25])
nlines_EP,nwidths_EP = meanmli.shape
def slide_window(array):
    for kk in range(InitCol, nwidths_EP-RadiusCol):
        for ll in range(InitCol, nwidths_EP-RadiusCol):
            Matrix = mlistack[:,ll-RadiusRow-1:ll+RadiusRow,kk-RadiusCol-1:kk+RadiusCol]
            Ref = Matrix[:,InitRow-1,InitCol-1]
            Xarray = dask.array.blockwise(np.tile, 'ij', Ref[:, np.newaxis], 'ij', (1, CalWin[0] * CalWin[1]), 'ij', dtype=Ref.dtype)
            Xarray = dask.array.tile(Ref[:, np.newaxis], (1, CalWin[0] * CalWin[1]))
            Matrix_T = dask.array.transpose(Matrix, (0, 2, 1))
            Yarray = dask.array.reshape(Matrix_T, (Matrix_T.shape[0], CalWin[0] * CalWin[1]))
            return Xarray

result = dask.array.map_blocks(slide_window, mlistack, dtype=mlistack.dtype)

I tried to make the changes according to your suggestions, but it didn’t seem to achieve the results I wanted. Also, you mentioned that you don’t recommend using dask.array to generate an empty array. However, I am very confused about how to save values and return them if an empty array is not generated.If I have offended you, please forgive me

Not at all, I’m just trying to make our problem easier to solve.

However, you code and purpose is still unclear to me. It would be much easier if you could really simplify it. If I understand correctly, you do a sliding window pixel per pixel on the entire image to compute something?

If so, I guess you should take a look at Overlapping Computations — Dask documentation, and maybe dask.array.lib.stride_tricks.sliding_window_view — Dask documentation. Once inside map_partitons, or map_overlap, you work on Dask Array partitions, which are actualy Numpy Arrays, so you can use back your numpy code.

After map_partitions/map_overlap, your results will be concatenated into a new Dask Array. You can compute it to get the result as Numpy array, or save it to disk.

I’m glad you replied to me. Yes, your understanding is correct. I am using a sliding window to slide and calculate each pixel. I looked at the map_overlap function as you said and used it, but the result I got doesn’t seem to be what I wanted. I don’t know where the problem lies. Can you give me some advice? Here is my code:

import BWStest as bws
import numpy as np
from skimage.measure import label
import dask.array
from tqdm import tqdm
from dask.distributed import Client, LocalCluster, progress
client = Client()
CalWin = [7,25]
stack = []
thershold = 0.05
image1 = np.arange(5*400*600, dtype=np.int32).reshape(5,400,600)
image2 = np.arange(5*400*600, dtype=np.int32).reshape(5,400,600)
stack_image = np.array(image1 + 1j*image2).astype(np.complex64)
stack_image = dask.array.from_array(stack_image, chunks=(1,200,300))
npages,nlines,nwidths = dask.array.shape(stack_image)
RadiusRow=(CalWin[0] - 1) // 2   #3
RadiusCol=(CalWin[1] -1 ) // 2  #12
InitRow=(CalWin[0] + 1) // 2  #4
InitCol=(CalWin[1] + 1) // 2 #13
def abs(array):
  return dask.array.map_blocks(np.abs, array, dtype=np.float16, chunks=array.chunks)
mlistack = abs(stack_image)
re = client.compute(mlistack)
mlistack = np.pad(mlistack,((0, 0), (RadiusRow, RadiusRow), (RadiusCol, RadiusCol)), mode='symmetric')
meanmli = dask.array.mean(mlistack, axis=0)
nlines_EP,nwidths_EP = meanmli.shape
parameter = np.array([InitRow, InitCol, RadiusRow, RadiusCol, nlines_EP, nwidths_EP])
def slide_window(mlistack,parameter):
  for kk in range(parameter[1], parameter[5]-parameter[3]):
      for ll in range(parameter[0], parameter[4]-parameter[2]):
        Matrix = mlistack[:,ll-parameter[2]-1:ll+parameter[2],\
        kk-parameter[3]-1:kk+parameter[3]]
        #If sliding according to its own loop, the dimension of 
        #Ref should be (5,), but the result I obtained is (5,2,2), 
        #and the dimension of Matrix is also incorrect
        Ref = Matrix[:,InitRow-1:InitRow,InitCol-1:InitCol]
        Xarray = np.tile(Ref[:, np.newaxis], (1, CalWin[0] * CalWin[1]))
        Matrix_T = np.transpose(Matrix, (0, 2, 1))
      # The following statement will report an error due to incorrect dimensions
        Yarray = np.reshape(Matrix_T, (Matrix_T.shape[0], CalWin[0] * CalWin[1]))
        # # T = dask.array.from_array(bws.BWS(Xarray.compute(), Yarray.compute(), thershold))
        return Ref
re = dask.array.map_overlap(slide_window, mlistack, parameter=parameter)   

Do you have any good suggestions? Thank you again