Could not serialize object of type HighLevelGraph with Client + ome-tiff

Hi,

I’m getting the error Could not serialize object of type HighLevelGraph when loading (ome) tiff data in combination with with Client. Drilling down, the error seems to originate from TypeError: cannot pickle 'BufferedReader' instances.

I’ve found 3 other posts about this error message on this forum, without a clear solution for this case (see below).

This issue only occurs when specifying a Client (and implicitly a Cluster), and only when loading tiff image data via tiff.tozarr() and da.from_zarr() subsequently. A minimal reproducible example is included here as well. Any help appreciated.

dask version 2025.4.1

import dask.array as da
import numpy as np
import zarr
from dask.distributed import Client
from skimage.transform import resize
from tifffile import TiffFile, TiffWriter


def save_ome_tiff(filename, data, npyramid_add=0):
    size = np.array(data.shape[:2])
    with TiffWriter(filename, ome=True) as writer:
        for i in range(npyramid_add + 1):
            if i == 0:
                subifds = npyramid_add
                subfiletype = None
            else:
                subifds = None
                subfiletype = 1
                size //= 2
                data = resize(data, size)
            writer.write(data, subifds=subifds, subfiletype=subfiletype)


if __name__ == "__main__":
    data = np.ones(shape=(512, 1024)).astype(np.float32)
    filename = 'test.ome.tiff'
    save_ome_tiff(filename, data, npyramid_add=4)

    store = TiffFile(filename).aszarr(multiscales=True)
    group = zarr.open_group(store=store, mode='r')
    # using group.attrs to get multiscales is recommended by cgohlke
    paths = group.attrs['multiscales'][0]['datasets']
    data = [group[path_dict['path']] for path_dict in group.attrs['multiscales'][0]['datasets']]
    dask_data = da.from_zarr(data[0])

    with Client(processes=False) as client:    # without Client it works fine
        value = np.mean(dask_data)
        result = value.compute()    # error!
        print(result)

(Other potentially related posts for completeness)

When working with Client/Scheduler/Workers (e.g. clusters), information are exchanged between processes. In order to exchange Python objects, they are serialized to bytes using pickle.

I suspect that TiffFile object is opening a file at some point, and the BufferedReader Python object cannot be pickled through processes. You probably cannot change that last part.

This would mean that TiffFile (or its aszarr implementation, because you are using this, not tozarr) is not Dask or multiprocessing ready. Idealy, the file opening, and partial reads, should occur on Worker side, not Client side, which is happening here.

I’m not sure what you are really trying to achieve here, but I can try to share some thoughts:

  • you could really convert your tiff to a Zarr file if it is not too big. Then Dask is able to properly read Zarr in a distributed way.
  • else you need to find a library, or define some functions, to read this tiff file on Worker side in a chunked way. In the geospatial domain, we have rasterio, and open_rasterio in Xarray.
1 Like

Thank you @guillaumeeb,
I’m trying to load a tiff as dask array (directly). I’m aware there’s a lazy/delayed way of doing this, but that loads whole tiff file(s) at a time, instead of taking advantage of selectively reading required tiff tiles which tif.tozarr() supports. (To be clear, I’m referring to internal tiling that tiff files inherently support, as opposed to separate tile files.)
I imagined there would be a way to do this that dask supports. I’ll explore the Xarray loader you mention. Then again it may not be entirely essential to do it this way. It would be interesting to measure the overhead of sourcing data this way as opposed to lazy/delayed load entire tiff file(s).

Converting source data to zarr is possible, but also as these are usually very large datasets ideally I want to support ome-tiff as well. However, if it is more time-efficient to include an initial conversion, then it would be worth the (temporary) data duplication.

Also please forgive my ignorance on dask internal processing. I assumed that scheduling always occurred, not only when using with Client/Cluster. Also, I actually don’t want to use processes, but only threads on a single process, but I guess dask serialises regardless?

In addition to da.from_zarr(TiffFile.aszarr())
I also tested:

  1. da.from_array(TiffFile().asarray())
  2. delayed(TiffFile().asarray())
  3. delayed(tifffile.imread)

Option 3 delayed(tifffile.imread) is compatible with Client(), and gives equally good performance to the initial aszarr approach. Also to confirm that none of these options load all data into memory.

Code:

As long as you build a TiffFile object, this won’t work with Dask Distributed. So yes, option 3 is probably the correct approach. However, it is not clear to me what you are trying to achieve, for this to be useful, you should read a part of the file within each Dask task.

So I guess the question are: what is the shape of your file(s)? Why do you need Dask?

Thank you again for the insights on this.

In conclusion to this thread, da.from_zarr(TiffFile.aszarr()) should not be used, but instead use: delayed(tifffile.imread) as best performing Dask-compatible solution. Marking that as solution.

For the context of this thread, the reason for using Dask is taking advantage of multi-threaded parallel processing, of too-big-for-memory data.

That I understand, but I imagine you have a solution a bit more complex to avoid reading the same chunks of data in every delayed task?