Dask+Numba: problem moving dask-array to GPU & result doesn't fit into memory


I am trying to accelerate a data processing program by using dask and GPU (numba), but facing two following issues now. There’s also a simplified code attached, please help. Thanks a lot!

  1. I hit a dask-array read-only error while trying to move the data from CPU to GPU, thus I used cupy.asarry(), which copies the data to GPU, as a workaround. I don’t think this approach can handle the memory properly and probably is causing some memory leakage problems.
  2. The result of compute() is approximately larger than 1TB and doesn’t fit into my memory. Is possible to continue writing the result to the disk while computing?
def cuda_calculation(X, Y, out):
    # some calculation here #

def foo(X, Y):
    cuda_X = cupy.asarray(X) 
    cuda_Y = cupy.asarray(Y)
    cuda_out = cupyp.zeros((Y.shape[0], Y.shape[1], X.shape[1], X.shape[2]))
    cuda_corr[(8, 8, 8), (8, 8, 8)](cuda_X.T, cuda_Y, cuda_out)
    out = cupyp.asnumpy(cuda_out)
    return out

client = Client(n_workers=4, threads_per_worker=2)

# da_x and da_y are both dask-arrays
# result is larger than 1TB

result = da.map_blocks(foo, da_x, da_y, chunks=(24, 3, 721, 90), drop_axis=0, new_axis=[1,2], dtype=np.float64)
result_ = result.compute()

Hi @timost1234, welcome to Dask Discourse forum!

How do you initially create da_x, and da_y? Are you using cupy?

Yeas, just use to_something, like to_zarr, instead of compute. See Create Dask Arrays — Dask documentation.

Thanks about this, it’s nice to have it. We could better help with a complete reproducible example though, could you do that?

Hi @guillaumeeb Thanks for your reply!

I will attach the complete code here and read the documents you povide.

da_x and da_y are dask-arrays acquired using xarray.open_dataset("a NetCDF4 file").data. I could not provide the raw data, but using da.from_array() to create dummy data might be enough to reproduce rest of the code.

import time
import math

import numpy as np
import netCDF4 as nc
from numba import cuda
import cupy as cp

from dask.distributed import Client, progress
import dask.array as da
from dask_cuda import LocalCUDACluster
import xarray as xr

def cuda_corr(X, Y, out):
    X: ([1440/c], 721, n)
    Y: ([480/c], 371, n)
    out: [[480/c], 371, 721, [1440/c]]
    start_i, start_j, start_k = cuda.grid(3)
    stride_i, stride_j, stride_k = cuda.gridsize(3)
    n = X.shape[2] # Time
    for k in range(start_k, X.shape[0], stride_k):
        for j in range(start_j, X.shape[1], stride_j):
            for i in range(start_i, Y.shape[0], stride_i):
                for p in range(Y.shape[1]):
                    sum_X = 0.
                    sum_Y = 0.
                    sum_XY = 0.
                    squareSum_X = 0.
                    squareSum_Y = 0.
                    for h in range(n):
                        sum_X = sum_X + X[k, j, h]
                        sum_Y = sum_Y + Y[i, p, h]
                        sum_XY = sum_XY + X[k, j, h] * Y[i, p, h]
                        squareSum_X = squareSum_X + X[k, j, h] * X[k, j, h]
                        squareSum_Y = squareSum_Y + Y[i, p, h] * Y[i, p, h]

                    out[i, p, j, k] = (n * sum_XY - sum_X * sum_Y)/ \
                           (math.sqrt((n * squareSum_X - \
                           sum_X * sum_X)* (n * squareSum_Y - \
                           sum_Y * sum_Y)))
def foo(X, Y):
    cuda_X = cp.asarray(X)  # copying dask-array from CPU to GPU
    cuda_Y = cp.asarray(Y)  # copying dask-array from CPU to GPU
    cuda_out = cp.zeros((Y.shape[0], Y.shape[1], X.shape[1], X.shape[2])) # create a cupy array directly on GPU
    cuda_corr[(8, 8, 8), (8, 8, 8)](cuda_X.T, cuda_Y, cuda_out)
    out = cp.asnumpy(cuda_out) # converting cupy array to numpy array
    return out

fileName="tp.nc" #netCDF (time: 10958, lon: 1440, lat: 721) 
nc1 = xr.open_dataset(fileName, chunks={'time': -1, 'lon': 90, 'lat': -1})
da_x = nc1['tp'].sel({'time': slice("2000-01-01", "2010-01-10")}).data

fileName="rr.nc" #netCDF (time: 8401, lon: 371, lat: 480) 
nc2 = xr.open_dataset(fileName, chunks={'time': -1, 'lon': -1, 'lat': 24})
da_y = nc2['rr'].sel({'time': slice("2000-01-01", "2010-01-10")}).data

# da_x: dask.array<getitem, shape=(3663, 721, 1440), dtype=float32, chunksize=(3663, 721, 90), chunktype=numpy.ndarray>,
# da_y: dask.array<getitem, shape=(3663, 480, 371), dtype=float32, chunksize=(3663, 24, 371), chunktype=numpy.ndarray>

client = Client(n_workers=4, threads_per_worker=2)
result = da.map_blocks(foo, da_x, da_y, chunks=(24, 371, 721, 90), drop_axis=0, new_axis=[1,2], dtype=np.float64)
result_ = result.compute()

Hope these materials could provide more information. Thanks again!