Why is it undesirable for delayed functions to mutate inputs?

Hi all! Regarding the following piece of best practice from Best Practices — Dask documentation, why is it not ideal for delayed functions to mutate their inputs?

Don’t mutate inputs
Your functions should not change the inputs directly.

Is it due to considerations for potential race conditions when the same function has been scheduled to run more than once in a concurrent fashion?

Separately, I’m wondering if there’s a more intuitive example we should consider using, as opposed to the current x += 1 and x = x + 1. For integers (and potentially other immutable types as well), I think the two are equivalent.

Thanks!

(This question was originally posted on GitHub Issues · dask/dask · GitHub)

Hi @zzhengnan, thanks for reopening your question here!

This is correct. If the same input is used in more than one node of the Dask task graph, mutating it can cause unpredictable results depending on the details of task scheduling (which might be different between Dask versions, or depend on latencies, or other vagaries).

Here’s a quick example:

import random
import time

import dask
import numpy

# pure=False allows the random call to be reevaluated
# with each compute()
@dask.delayed(pure=False)
def doubler(x):
    time.sleep(random.random())
    x  *=  2
    return x

@dask.delayed(pure=False)
def adder(x):
    time.sleep(random.random())
    x += 1
    return x

results = []
for _ in range(10):
    x = numpy.ones(1)
    y = doubler(x) + adder(x)
    results.append(y.compute())

Depending on the order of when the mutations of x occur, you can either get 6 or 8, when I really probably wanted 4.

If I rewrite to avoid mutations:

@dask.delayed(pure=False)
def doubler(x):
    time.sleep(random.random())
    return x * 2

@dask.delayed(pure=False)
def adder(x):
    time.sleep(random.random())
    return x + 1

I get the expected result of 4.

Another reason to avoid mutability is that things will continue working as the same way if you move to a distributed setting. If a local variable is mutated with the synchronous dask scheduler, you’ll probably see something quite different with the distributed one, since the workers no longer have shared memory.

I think you’re absolutely right. My example used a mutable datatype for this reason. I think a really nice docs contribution would be to provide a better example.

1 Like

Thanks @ian for the reply! And apologies for the delay, as I’ve been on the road recently.

I’ll re-examine the existing docs along with your explanation. If I can come up with a more intuitive example, I’ll be sure to open a PR.

1 Like

Thanks for the help, but just wanted to call out that your code was always returning 4 for me. If I change it to what I have below, it will do the 6/8 behavior that you referenced.

# pure=False allows the random call to be reevaluated
# with each compute()
@dask.delayed(pure=False)
def doubler(x):
    time.sleep(random.random())
    x[0] *=  2
    return x

@dask.delayed(pure=False)
def adder(x):
    time.sleep(random.random())
    x[0] += 1
    return x

results = []
for _ in range(10):
    x = numpy.ones(1)
    y = doubler(x) + adder(x)
    results.append(y.compute())

@jefsayshi Interesting, my initial example still works for me. I wonder if it depends on the version of numpy you are using (I am on 1.21.4) ? There has been a lot of internal work on numpy arrays recently, I could see there being subtle differences depending on copy/view semantics.

I might put that down as another reason to avoid in-place operations.

I changed to your version of numpy and did see that 6/8 with your code.
I was actually on an older version of numpy: 1.18.5

1 Like

@ian Looking back at your example more closely, I don’t understand why we would get 8 at all.

I get how we might get 6. This happens if adder gets executed before doubler . More concretely, [1] += 1 -> [2] followed by [2] *= 2 -> [4] , giving [4] + [2] = [6] .

By contrast, if doubler gets executed before adder , shouldn’t the result be 5 instead? That is, [1] *= 2 -> [2] followed by [2] += 1 -> [3] , giving [2] + [3] = [5] .

What am I missing here? Thanks.

Hi @zzhengnan, sorry for the slow reply. It’s a good question, and I had to take a closer look at the code! I think what’s going on is this:

Scenario A: adder finishes before doubler. In this case [1] += 1 -> [2], followed by [2] *= 2 -> [4], just like you say. But! Remember they both return the mutated x, rather than the value of x or a copy of it, so the final step is actually [4] + [4] = [8].

Scenario B: doubler finishes before adder. In this case [1] *= 2 -> [2], followed by [2] += 1 -> [3], again as you say. And then in the final step we get [3] + [3] = [6].

So that’s how we get 6 or 8. We can actually get the 5/6 result that you came up with by still mutating the inputs, but then returning a copy of the mutated input in both functions: return numpy.copy(x).

Phew, concurrency under mutating data.