Best practice for safely overwriting lazy-loaded data

Dear Dask developers, I am one of the core developers of a library for representing and manipulating spatial molecular data in Python: GitHub - scverse/spatialdata: An open and universal framework for processing spatial omics data which depends on Dask. I would like to discuss the best practices to safely overwrite lazy-loaded data using Dask. In particular I would like to know if a recommended workflow and/or implementation already exists, and if not, I would kindly ask for feedback on our planned implementation.

Our problem

In the library we use Dask to lazily read Zarr and Parquet datasets, and we respectively represent them in-memory using Dask-based Xarray objects and Dask dataframes. We want to provide the user of a function write() with a parameter overwrite=True that allows the user to eventually replace the disk storage of any object that appears in the computational graph used to lazy-compute the object that needs to be written (I will show an example of this below).
Our requirement is that the function needs to be safe against any kind of data loss by taking into account for the fact that the write() call may get interrupted (example: running out of disk space or killing the process that is writing).

Looking at the documentation it seems that this functionality is not currently provided (to_parquet() has a parameter overwrite=True, but the docs say “NOTE: overwrite=True will remove the original data even if the current write operation fails. Use at your own risk.”).

Is there a recommended workflow/implementation that you suggest to perform this operation safely?
If not, below is our proposed design to achieve a safe implementation, under reasonable assumptions.

Our design proposal

Assume that we want to write an object x to disk. x depends on y and z (say x = f(y, z)), and both y and z are lazy-loaded. We want to write x to replace y.

Further requirements.
a) Our process plans to use an intermediate directory that is local and known to the user, let’s call this directory intermediate_storage. The user is aware of this directory and should never write to this. Still, if something goes wrong the user knows that the data can be found there.
b) The process described is designed for single-threaded operations and not for multiple threads.

This is how the write operation would be implemented.

  1. The user calls the function write(x, des=''/path/to/z")
  2. We first generate a UUID and write z to intermediate_storage/{UUID}
  3. We delete '/path/to/z` if it exists
  4. We copy intermediate_storage/{UUID} to /path/to/z
  5. We delete intermediate_storage/{UUID}

Safety considerations

In the above implementation everything is 100% safe in the first 2 steps, since nothing is deleted. And after that step, if the assumptions are respected (intermediate_storage is not written directly by the user, is local, and only one thread is writing the data), there should always be a fully copy of the data on disk, either in the original location, either in the intermediate_storage folder, either in the target location.

I say “should” because the one above is not a mathematical proof and in principle there could be some edge cases, like using non-conventional file systems, or hidden bugs in one of the libraries we depend on, that could trigger an unpredictable behavior.

Therefore, the documentation would transparently explain the process above to the user and would inform the user that the final responsibility to consider the implications of the write() API need to be taken into account. In the end, when finalized (=after peer review and after adding tests), the above implementation would be very unlikely leading to data loss, and if the users follow common safety practice (e.g. keep the raw data raw, and having data backups), I believe that the risk of data loss is not higher that when performing any other data analysis operation based on open-source software.

Conclusions

In the case of our library multiple users asked for having the behavior implemented, and more in general I expect that the problem that I described to be shared among multiple Dask users. Therefore, I believe that the community would benefit from a tested, documented and open source implementation of a safe overwrite operation, and that this would be more beneficial than having the user reimplementing it on their own.

Hi @LucaMarconato,

Just to be sure, are you talking about updating just part of the dataset, or just replacing the whole file is enough? According to the rest of your post, I assume replacing the whole file is OK.

As those file formats were designed to be immutable, the workflow I would suggest is close to the one you’re proposing: just write in another location and move the data once you are happy with the result.

I’m not sure about Parquet, but I think there has been work to be able to append or even modify just part of Zarr arrays, at least this should be feasible.

In the example you provide, I think you are replacing z.

How is chosen this directory? How is the user aware of it?

Why do you say it’s designed for single-threaded? This is a strong limitation for big data file formats.

In step 4 and 5, copying the data can be expensive, this is mandatory on object storage, but moving the data should be considered on POSIX file systems.

Well, I don’t think there is a much better solution to safely overwrite Parquet and Zarr datasets. However, the solution is essentially: write somewhere else, and replace the dataset after that. I don’t think implementing this on Dask side is really useful, because the optimal way to do this will depend on the storage infrastructure, the data volume, and probably other user’s considerations.

cc @martindurant.

There are a number of interesting intersecting ideas here. Some thoughts follow.

  • any write-and-move scheme is inherently non-atomic, but should at least always produce valid datasets
  • fsspec’s transaction construct is a really simply way to write to one place and then finalise all the writes in a short time by moving (or whatever the backend implementation is). This is single-threaded, and interruption during finalise would still be problematic.
  • there are technologies built on parquet to allow transactional updates, eg., deltalake and iceberg. The formet of these has dask integration, the latter some primitive dask hooks.
  • zarr does expect chunk-wise updates, via a read-amend-rewrite pattern (so vulnerable to races), but this is not readily available via the dask.array or xarray APIs
  • earthmover’s product “datalake” overlaid on zarr provides git-like commit transactions
  • tiledb is zarr-like in concept with internal data deltas. Again, dask.array does interface with tiledb, but I don’t think it easily surfaces that functionality.