Improving pipeline resilience when using `to_parquet` and preemptible workers

Hello everyone,

As I am using dask more and more I found a little resilience issue when using workers on AWS spot instances (preemptible instances that can be killed anytime).

Basically I have a very simple flow in pseudo code:

import dask.dataframe as ddf

# Suppose here I have a pandas dataframe df
...

daskdf = ddf.from_pandas(df, npartitions = 2000)
daskdf = daskdf.map_partitions(sample_function)
daskdf = daskdf.repartitions(npartitions = 500)
daskdf.to_parquet(s3_uri, compute = True)

Now it seem that the to_parquet function creates two tasks: to_parquet and store_to_parquet. The final one ties everything together (as per comment in the code). But if I understood correctly workers will hold the results of to_parquet computation in memory until all the 500 partitions are saved in memory.
This seems to be confirmed in the dashboard where I can see that results are held in memory.

Screenshot 2023-08-21 at 09.47.23

But I have preemptible workers, so they are holding the result in memory for some time and may be deleted before I finish saving my entire parquet file. This means that my pipeline will save some file and ‘forget’ about them and recompute an entire part of the graph which it has done before and saved already.

I found a possible resolution by replacing the last method call with

    parquet_files = [
        d.to_parquet(
            os.path.join(path, f"part.{i}.parquet"),
            storage_options=storage_options,
            # write_index=False,
            # compute=False,
        )
        for (i, d) in enumerate(df.to_delayed())
    ]

    client: dask.distributed.Client = dask.distributed.get_client()
    futures = client.compute(parquet_files)

    for future in as_completed(futures):
        # Release the results
        future.result()
        future.release()

It seems to work so far but I wanted to know if there is any downside to such approach ?
I can see that as I save the parquet file the entire portion of the completed graph is deleted from memory so maybe that’s not a good thing. But since I am saving into memory I don’t feel like this is a big issue.

Is there also any alternative ways of doing it that I could use so that I don’t forget about those intermediate computations ?

Thank you for your help and advise in advance :slight_smile:

Hi @hyenal, welcome to dask community!

Yes you are totally right, until everything is written to disk and the last single task has been computed, to_parquet tasks result (which have a really small memory footprint) will be held in memory.

So yes, I also guess that with preeptible Workers, if they don’t shut down gracefully, it will probably trigger the recomputation of some results, which is not good, especially when writting to disk.

I’m not seeing one so far, as long as you know what you are doing. With premptible worker you’ll need to just stream and forget the tasks.

cc @rjzamora @fjetter, you may have some things to say about the design decision leading to that last task that ties everything together, which doesn’t exist in to_csv for example.

1 Like

The historical reason for the final “tie it all together” task, is that we used to write a global _metadata file in that task by default. Therefore, in most cases, we actually needed this task to aggregate the various parquet footer metadata in one place before the global to_parquet operation was technically done.

Now that the default case does not write the global _metadata file, we can probably adjust the design here a bit.

1 Like

Thanks a lot both for your (super fast) replies! :slight_smile:
I would be happy to contribute to a PR to the dask repo if this is an interesting feature for the project. I have a rough idea of implementation, I suspect that in the case we want to release memory directly we could just call .compute on the last map_partitions directly (instead of creating a fake task) ?

1 Like

Well, please do so! We very welcome new contributors and I think this feature is interesting.

Thank you!

I submitted an issue for now (link), in the issue I added a link to a PR I made on my own dask fork. The PR does not solve the problem but gives more freedom to the user downstream (you can still tie it to one task if you wish to)

Feel free to upvote it if this is of interest :slight_smile: