Why is it undesirable for delayed functions to mutate inputs?

Hi @zzhengnan, thanks for reopening your question here!

This is correct. If the same input is used in more than one node of the Dask task graph, mutating it can cause unpredictable results depending on the details of task scheduling (which might be different between Dask versions, or depend on latencies, or other vagaries).

Here’s a quick example:

import random
import time

import dask
import numpy

# pure=False allows the random call to be reevaluated
# with each compute()
@dask.delayed(pure=False)
def doubler(x):
    time.sleep(random.random())
    x  *=  2
    return x

@dask.delayed(pure=False)
def adder(x):
    time.sleep(random.random())
    x += 1
    return x

results = []
for _ in range(10):
    x = numpy.ones(1)
    y = doubler(x) + adder(x)
    results.append(y.compute())

Depending on the order of when the mutations of x occur, you can either get 6 or 8, when I really probably wanted 4.

If I rewrite to avoid mutations:

@dask.delayed(pure=False)
def doubler(x):
    time.sleep(random.random())
    return x * 2

@dask.delayed(pure=False)
def adder(x):
    time.sleep(random.random())
    return x + 1

I get the expected result of 4.

Another reason to avoid mutability is that things will continue working as the same way if you move to a distributed setting. If a local variable is mutated with the synchronous dask scheduler, you’ll probably see something quite different with the distributed one, since the workers no longer have shared memory.

I think you’re absolutely right. My example used a mutable datatype for this reason. I think a really nice docs contribution would be to provide a better example.

1 Like