Ordering tasks that are waiting for data using da.from_delayed

I am performing near-real-time processing of data distributed over N files (N = 40). The files come in one by one over a period of around ten minutes. Previously, I waited for all files to be present before starting my processing. To improve timeliness, I am trying to process each file as it arrives. To do so, I create 40 dask arrays with da.from_delayed — once for each file. Each underlying dask delayed object is waiting for a specific file and starts processing it as soon as it arrives.

It works! But the tasks are not scheduled in the order in which I have created them. The result: if I have 8 dask workers, I might have tasks waiting for files 33–40 taking up all workers, while at the same time, tasks waiting for files 1–32 could be doing work, but aren’t, because no worker is free. Can I tell the dask scheduler that it should schedule tasks I submit first before tasks I submit last, or have some other way to force/nudge it into scheduling some tasks before others?

With the dask distributed scheduler, I can use secede() and rejoin() within each task until files are available. This works, as soon enough a task that has data will be scheduled and can get to work. However, the software is not fully compatible with dask distributed, and some operations fail with errors pointing at failing serialisation (“cannot be converted to a Python object for pickling”). Another workaround would be to use as many workers as I have files, but that means I cannot optimise the number of workers for the machine resources.

With the default scheduler, can I get dask to work on tasks processing files that arrive early before it works on tasks processing files that arrive late?

Hi @gerrit, welcome to Dask community!

It would be much more easier if you provided a minimum reproducible example, or at least some code snippets.

Local threaded Scheduler does not provide many ways of control of scheduling order as far as I know. The best would be to use Distributed, either with Future API, or Delayed with priorities applied.

But depending on your workflow, there might be some solutions.

Hi @guillaumeeb, you’re right, of course. Sorry for not providing this yesterday. Looks like I can’t edit my original post any more. Here is a working example that illustrates and reproduces my problem:

import dask
import dask.array as da
import numpy as np
import time
import netCDF4

N = 40
shp = (10, 10, 10)

@dask.delayed
def get_value(fn, max_tries=100, wait=5):
    print("Waiting for", fn)
    for _ in range(max_tries):
        try:
            nc = netCDF4.Dataset(fn, "r")
        except FileNotFoundError:
            time.sleep(wait)
        else:
            break
    else:
        raise TimeoutError("File failed to materialise")
    print("Found", fn)
    return nc["data"][:]

coll = None
for i in range(N):
    arr = da.from_delayed(get_value(f"/tmp/test{i:d}.nc"), shape=shp, dtype="float32")
    if coll is None:
        coll = arr
    else:
        coll += arr

print(coll.compute())

(In my actual use case, the computation is much more expensive than a simply summation; some computation can begin as soon as one file is available, later steps depend on multiple files to be available, and final steps need all files.)

Simulation script to create the files:

# counterpart to test-dask-data-waiting-multi.py

import numpy as np
import time
import xarray as xr

N = 40
shp = (10, 10, 10)

for i in range(N):
    ds = xr.Dataset()
    ds["data"] = xr.DataArray(np.random.randint(0, 10, size=shp), dims=["z", "y", "x"])
    ds.to_netcdf(f"/tmp/test{i:d}.nc")
    time.sleep(1)

Script output:

Waiting for /tmp/test38.nc
Waiting for /tmp/test36.nc
Waiting for /tmp/test2.nc
Waiting for /tmp/test20.nc
Waiting for /tmp/test33.nc
Waiting for /tmp/test5.nc
Waiting for /tmp/test4.nc
Waiting for /tmp/test37.nc
Found /tmp/test4.nc
Found /tmp/test5.nc
Found /tmp/test2.nc
Waiting for /tmp/test12.nc
Waiting for /tmp/test16.nc
Waiting for /tmp/test11.nc
Found /tmp/test11.nc
Found /tmp/test12.nc
Waiting for /tmp/test9.nc
Waiting for /tmp/test26.nc
Found /tmp/test9.nc
Waiting for /tmp/test22.nc
Found /tmp/test20.nc
Waiting for /tmp/test10.nc
Found /tmp/test10.nc
Waiting for /tmp/test8.nc
Found /tmp/test8.nc
Waiting for /tmp/test24.nc
Found /tmp/test16.nc
Waiting for /tmp/test31.nc
Found /tmp/test24.nc
Waiting for /tmp/test6.nc
Found /tmp/test6.nc
Waiting for /tmp/test21.nc
Found /tmp/test21.nc
Waiting for /tmp/test27.nc
Found /tmp/test22.nc
Waiting for /tmp/test14.nc
Found /tmp/test14.nc
Waiting for /tmp/test18.nc
Found /tmp/test18.nc
Waiting for /tmp/test1.nc
Found /tmp/test1.nc
Waiting for /tmp/test32.nc
Found /tmp/test27.nc
Waiting for /tmp/test3.nc
Found /tmp/test26.nc
Found /tmp/test3.nc
Waiting for /tmp/test19.nc
Waiting for /tmp/test39.nc
Found /tmp/test19.nc
Waiting for /tmp/test23.nc
Found /tmp/test23.nc
Waiting for /tmp/test7.nc
Found /tmp/test7.nc
Waiting for /tmp/test13.nc
Found /tmp/test13.nc
Waiting for /tmp/test17.nc
Found /tmp/test17.nc
Waiting for /tmp/test34.nc
Found /tmp/test32.nc
Found /tmp/test38.nc
Found /tmp/test37.nc
Found /tmp/test36.nc
Found /tmp/test39.nc
180.418

As we can see, the script starts by waiting for test38, test36, test2, test20. Tasks are occupying workers while idly waiting for those files to occur, whereas those tasks that could start calculations based on test0, test1, test3 etc. are waiting for workers even though their files are available. With dask.distributed, I can solve this problem as follows:

import dask.array as da
import numpy as np
import time
import netCDF4
import dask.distributed

N = 40
shp = (10, 10, 10)

@dask.delayed
def get_value(fn, max_tries=100, wait=5):
    print("Waiting for", fn)
    for _ in range(max_tries):
        try:
            nc = netCDF4.Dataset(fn, "r")
        except FileNotFoundError:
            dask.distributed.secede()
            time.sleep(wait)
            dask.distributed.rejoin()
        else:
            break
    else:
        raise TimeoutError("File failed to materialise")
    print("Found", fn)
    return nc["data"][:]

coll = None
for i in range(N):
    arr = da.from_delayed(get_value(f"/tmp/test{i:d}.nc"), shape=shp, dtype="float32")
    if coll is None:
        coll = arr
    else:
        coll += arr

if __name__ == "__main__":
    with dask.distributed.Client():
        print(coll.mean().compute())

with the corresponding script output:

Waiting for /tmp/test7.nc
Waiting for /tmp/test27.nc
Waiting for /tmp/test35.nc
Waiting for /tmp/test28.nc
Waiting for /tmp/test18.nc
Waiting for /tmp/test19.nc
Waiting for /tmp/test10.nc
Waiting for /tmp/test26.nc
Waiting for /tmp/test6.nc
Waiting for /tmp/test4.nc
Waiting for /tmp/test32.nc
Waiting for /tmp/test14.nc
Waiting for /tmp/test15.nc
Waiting for /tmp/test24.nc
Waiting for /tmp/test23.nc
Waiting for /tmp/test5.nc
Waiting for /tmp/test11.nc
Waiting for /tmp/test3.nc
Waiting for /tmp/test21.nc
Waiting for /tmp/test16.nc
Waiting for /tmp/test9.nc
Waiting for /tmp/test37.nc
Waiting for /tmp/test31.nc
Waiting for /tmp/test30.nc
Waiting for /tmp/test17.nc
Waiting for /tmp/test36.nc
Waiting for /tmp/test0.nc
Waiting for /tmp/test20.nc
Waiting for /tmp/test38.nc
Waiting for /tmp/test29.nc
Waiting for /tmp/test12.nc
Waiting for /tmp/test8.nc
Waiting for /tmp/test2.nc
Waiting for /tmp/test25.nc
Waiting for /tmp/test1.nc
Waiting for /tmp/test13.nc
Waiting for /tmp/test33.nc
Waiting for /tmp/test34.nc
Waiting for /tmp/test22.nc
Found /tmp/test4.nc
Found /tmp/test3.nc
Found /tmp/test0.nc
Found /tmp/test2.nc
Found /tmp/test1.nc
Found /tmp/test7.nc
Found /tmp/test6.nc
Found /tmp/test5.nc
Found /tmp/test9.nc
Found /tmp/test8.nc
Found /tmp/test10.nc
Found /tmp/test14.nc
Found /tmp/test12.nc
Found /tmp/test11.nc
Found /tmp/test13.nc
Found /tmp/test16.nc
Found /tmp/test18.nc
Found /tmp/test19.nc
Found /tmp/test15.nc
Found /tmp/test17.nc
Found /tmp/test21.nc
Found /tmp/test24.nc
Found /tmp/test23.nc
Found /tmp/test20.nc
Found /tmp/test22.nc
Found /tmp/test27.nc
Found /tmp/test26.nc
Found /tmp/test28.nc
Found /tmp/test29.nc
Found /tmp/test25.nc
Found /tmp/test32.nc
Found /tmp/test31.nc
Found /tmp/test30.nc
Found /tmp/test34.nc
Found /tmp/test33.nc
Found /tmp/test39.nc
Found /tmp/test35.nc
Found /tmp/test37.nc
Found /tmp/test38.nc
Found /tmp/test36.nc
179.849

but in my actual use case, I am using the rasterio library to write a GeoTIFF image, and sadly the rasterio library does not work with dask.distributed.

The question is: is there any way with the standard scheduler for idle tasks to not take up workers, or to otherwise get the workers that have their files available get to work as early as possible? I could get the same effect by using 40 workers, but that might have effects on other parts of my processing.

If you cannot use Distributed Client, then a solution I’m thinking about is just computing, or launching computation, every nworker files. For example:

for i in range(N):
    arr = da.from_delayed(get_value(f"/tmp/test{i:d}.nc"), shape=shp, dtype="float32")
    if coll is None:
        coll = arr
    else:
        coll += arr
    if i % 10 == 0:
        coll = coll.persist()

Or just doing multiple compute loops.

However:

 I am using the rasterio library to write a GeoTIFF image, and sadly the rasterio library does not work with dask.distributed.

I’m really suprised by this, I heavily used rioxarray, which allows using rasterio to build Xarray datasets, and it works quite well in a distributed environment. Maybe try importing rasterio in your tasks side?

The best solution using Distributed would probably to use Future API in order to launch tasks upon file reception.