Repeated recomputing of dask graph with random draws?

Hi - I’m trying to create uncorrelated signal streams with a shared frequency profile. These are basically a Nx1 sample of complex values with a specific frequency profile and random phases pushed through a iFFT function resulting in a real-valued random stream with the desired frequency profile. I thought I could speed this up with Dask but am not getting the results I expect.
See code sample below.
The setup, ie everything up to ifd2.compute(), takes 10s of seconds for large values of Np. Executing the graph via compute() takes just a few 100 milliseconds.

My question is can I do something in the setup to have it do a new random draw every time I call ifd2.compute() (without reperforming the setup). Right now it seems like after a single setup, it pulls the same random values every time I call compute()

Thanks!

import numpy as np
import dask.array as da
import matplotlib.pyplot as plt

def fftnoise(f,ret="R",nn=None):
    """
    fftnoise creates a real-valued random array with specific frequency profile.

    :param f: frequency profile of return
    :param retComplex: return type: _R_eal,_I_mag, _C_omplex, or _M_ag (default="R")
    :param nn: optional value that causes status printing with a tag if included (default None)   
    :return: real valued Dask Array with length = length(f)
    """ 
    ff = da.array(f, dtype='complex')
    Np = (len(f) - 1) // 2
    if not (nn is None):
        print("fftnoise: Doing Random Draw {}".format(nn))
    phases = da.random.random(Np) * 2 * np.pi
    phases = da.cos(phases) + 1j * da.sin(phases)
    ff[1:Np+1] *= phases
    ff[-1:-1-Np:-1] = da.conj(ff[1:Np+1])
    if not (nn is None):
        print("fftnoise: Doing ifft {}".format(nn))
    if ret=="R":
        ifd=da.fft.ifft(ff).real
    elif ret=="I": 
        ifd=da.fft.ifft(ff).imag
    elif ret=="C":
        ifd=da.fft.ifft(ff)
    else:
        ifd=da.absolute(da.fft.ifft(ff))
    return da.rechunk(ifd,chunks=min(2**20,len(f)))

Np=2**20
## creates frequency profile with roots as low points
freqFunc=lambda x: ( x-0.2)*( x-0.1)*(x-0.3)*( x-0.4)
ff =  freqFunc(np.linspace(0,2,2*Np))
##mirror to other side of pi for ifft
ff[-1:-1-Np:-1] = ff[1:Np+1]

plt.figure(figsize=(20,5))
plt.plot(ff,label="Freq profile")
plt.legend()
ifd2=fftnoise(ff)
plt.figure(figsize=(20,5))
ifd2.compute()
plt.plot(ifd2.real,label="ifd2.real")
plt.legend()
plt.figure(figsize=(20,5))
plt.specgram(ifd2)
print("done")

## each of these subsequent calls produces the same values in ifd2
#ifd2.compute()
#...
#ifd2.compute()

Welcome @erikson1970, thanks for the nice reproducible example.

The seeds for the random draw are created when you set up the Dask array (in calling da.random.random). In fact, I wouldn’t be surprised if the generation of the seeds is taking up a significant chunk of your the setup. So to answer your immediate question: there isn’t anything you can do to have a new random draw every time you call ifd2.compute(), as it’s recomputing the same Dask array with the same parameters (including seeds). In fact, the dask distributed scheduler will avoid recomputing the result all-together, and just reuse the existing result if it’s available (see, e.g., this discussion).

You can, however, create many Dask arrays, each of which will get its own set of seeds, and compute them all at once with a distributed scheduler:

import distributed
client = distributed.Client()

ifds = []
for _ in range(100):
    ifds.append(fftnoise(ff))

client.submit(ifds)  # computes your many spectra in parallel.

If I understand your question correctly, the above might be unsatisfying because you are still paying the price of setup for each of your arrays, which is today somewhat expensive. You might try instead reworking your fftnoise function to return a numpy array, then run it N times with pure=False as documented in the above link. Something like

futs = []
ff2 = client.scatter(ff)   # send the initial seed array to all the workers
for _ in range(10):
    # pure=False so that the same result isn't just reused.
    futs.append(client.submit(fftnoise2, ff2, pure=False)) 
    
results = client.gather(futs)