Hi @guillaumeeb, you’re right, of course. Sorry for not providing this yesterday. Looks like I can’t edit my original post any more. Here is a working example that illustrates and reproduces my problem:
import dask
import dask.array as da
import numpy as np
import time
import netCDF4
N = 40
shp = (10, 10, 10)
@dask.delayed
def get_value(fn, max_tries=100, wait=5):
print("Waiting for", fn)
for _ in range(max_tries):
try:
nc = netCDF4.Dataset(fn, "r")
except FileNotFoundError:
time.sleep(wait)
else:
break
else:
raise TimeoutError("File failed to materialise")
print("Found", fn)
return nc["data"][:]
coll = None
for i in range(N):
arr = da.from_delayed(get_value(f"/tmp/test{i:d}.nc"), shape=shp, dtype="float32")
if coll is None:
coll = arr
else:
coll += arr
print(coll.compute())
(In my actual use case, the computation is much more expensive than a simply summation; some computation can begin as soon as one file is available, later steps depend on multiple files to be available, and final steps need all files.)
Simulation script to create the files:
# counterpart to test-dask-data-waiting-multi.py
import numpy as np
import time
import xarray as xr
N = 40
shp = (10, 10, 10)
for i in range(N):
ds = xr.Dataset()
ds["data"] = xr.DataArray(np.random.randint(0, 10, size=shp), dims=["z", "y", "x"])
ds.to_netcdf(f"/tmp/test{i:d}.nc")
time.sleep(1)
Script output:
Waiting for /tmp/test38.nc
Waiting for /tmp/test36.nc
Waiting for /tmp/test2.nc
Waiting for /tmp/test20.nc
Waiting for /tmp/test33.nc
Waiting for /tmp/test5.nc
Waiting for /tmp/test4.nc
Waiting for /tmp/test37.nc
Found /tmp/test4.nc
Found /tmp/test5.nc
Found /tmp/test2.nc
Waiting for /tmp/test12.nc
Waiting for /tmp/test16.nc
Waiting for /tmp/test11.nc
Found /tmp/test11.nc
Found /tmp/test12.nc
Waiting for /tmp/test9.nc
Waiting for /tmp/test26.nc
Found /tmp/test9.nc
Waiting for /tmp/test22.nc
Found /tmp/test20.nc
Waiting for /tmp/test10.nc
Found /tmp/test10.nc
Waiting for /tmp/test8.nc
Found /tmp/test8.nc
Waiting for /tmp/test24.nc
Found /tmp/test16.nc
Waiting for /tmp/test31.nc
Found /tmp/test24.nc
Waiting for /tmp/test6.nc
Found /tmp/test6.nc
Waiting for /tmp/test21.nc
Found /tmp/test21.nc
Waiting for /tmp/test27.nc
Found /tmp/test22.nc
Waiting for /tmp/test14.nc
Found /tmp/test14.nc
Waiting for /tmp/test18.nc
Found /tmp/test18.nc
Waiting for /tmp/test1.nc
Found /tmp/test1.nc
Waiting for /tmp/test32.nc
Found /tmp/test27.nc
Waiting for /tmp/test3.nc
Found /tmp/test26.nc
Found /tmp/test3.nc
Waiting for /tmp/test19.nc
Waiting for /tmp/test39.nc
Found /tmp/test19.nc
Waiting for /tmp/test23.nc
Found /tmp/test23.nc
Waiting for /tmp/test7.nc
Found /tmp/test7.nc
Waiting for /tmp/test13.nc
Found /tmp/test13.nc
Waiting for /tmp/test17.nc
Found /tmp/test17.nc
Waiting for /tmp/test34.nc
Found /tmp/test32.nc
Found /tmp/test38.nc
Found /tmp/test37.nc
Found /tmp/test36.nc
Found /tmp/test39.nc
180.418
As we can see, the script starts by waiting for test38
, test36
, test2
, test20
. Tasks are occupying workers while idly waiting for those files to occur, whereas those tasks that could start calculations based on test0
, test1
, test3
etc. are waiting for workers even though their files are available. With dask.distributed, I can solve this problem as follows:
import dask.array as da
import numpy as np
import time
import netCDF4
import dask.distributed
N = 40
shp = (10, 10, 10)
@dask.delayed
def get_value(fn, max_tries=100, wait=5):
print("Waiting for", fn)
for _ in range(max_tries):
try:
nc = netCDF4.Dataset(fn, "r")
except FileNotFoundError:
dask.distributed.secede()
time.sleep(wait)
dask.distributed.rejoin()
else:
break
else:
raise TimeoutError("File failed to materialise")
print("Found", fn)
return nc["data"][:]
coll = None
for i in range(N):
arr = da.from_delayed(get_value(f"/tmp/test{i:d}.nc"), shape=shp, dtype="float32")
if coll is None:
coll = arr
else:
coll += arr
if __name__ == "__main__":
with dask.distributed.Client():
print(coll.mean().compute())
with the corresponding script output:
Waiting for /tmp/test7.nc
Waiting for /tmp/test27.nc
Waiting for /tmp/test35.nc
Waiting for /tmp/test28.nc
Waiting for /tmp/test18.nc
Waiting for /tmp/test19.nc
Waiting for /tmp/test10.nc
Waiting for /tmp/test26.nc
Waiting for /tmp/test6.nc
Waiting for /tmp/test4.nc
Waiting for /tmp/test32.nc
Waiting for /tmp/test14.nc
Waiting for /tmp/test15.nc
Waiting for /tmp/test24.nc
Waiting for /tmp/test23.nc
Waiting for /tmp/test5.nc
Waiting for /tmp/test11.nc
Waiting for /tmp/test3.nc
Waiting for /tmp/test21.nc
Waiting for /tmp/test16.nc
Waiting for /tmp/test9.nc
Waiting for /tmp/test37.nc
Waiting for /tmp/test31.nc
Waiting for /tmp/test30.nc
Waiting for /tmp/test17.nc
Waiting for /tmp/test36.nc
Waiting for /tmp/test0.nc
Waiting for /tmp/test20.nc
Waiting for /tmp/test38.nc
Waiting for /tmp/test29.nc
Waiting for /tmp/test12.nc
Waiting for /tmp/test8.nc
Waiting for /tmp/test2.nc
Waiting for /tmp/test25.nc
Waiting for /tmp/test1.nc
Waiting for /tmp/test13.nc
Waiting for /tmp/test33.nc
Waiting for /tmp/test34.nc
Waiting for /tmp/test22.nc
Found /tmp/test4.nc
Found /tmp/test3.nc
Found /tmp/test0.nc
Found /tmp/test2.nc
Found /tmp/test1.nc
Found /tmp/test7.nc
Found /tmp/test6.nc
Found /tmp/test5.nc
Found /tmp/test9.nc
Found /tmp/test8.nc
Found /tmp/test10.nc
Found /tmp/test14.nc
Found /tmp/test12.nc
Found /tmp/test11.nc
Found /tmp/test13.nc
Found /tmp/test16.nc
Found /tmp/test18.nc
Found /tmp/test19.nc
Found /tmp/test15.nc
Found /tmp/test17.nc
Found /tmp/test21.nc
Found /tmp/test24.nc
Found /tmp/test23.nc
Found /tmp/test20.nc
Found /tmp/test22.nc
Found /tmp/test27.nc
Found /tmp/test26.nc
Found /tmp/test28.nc
Found /tmp/test29.nc
Found /tmp/test25.nc
Found /tmp/test32.nc
Found /tmp/test31.nc
Found /tmp/test30.nc
Found /tmp/test34.nc
Found /tmp/test33.nc
Found /tmp/test39.nc
Found /tmp/test35.nc
Found /tmp/test37.nc
Found /tmp/test38.nc
Found /tmp/test36.nc
179.849
but in my actual use case, I am using the rasterio
library to write a GeoTIFF image, and sadly the rasterio library does not work with dask.distributed.
The question is: is there any way with the standard scheduler for idle tasks to not take up workers, or to otherwise get the workers that have their files available get to work as early as possible? I could get the same effect by using 40 workers, but that might have effects on other parts of my processing.