Hello everyone,
As I am using dask more and more I found a little resilience issue when using workers on AWS spot instances (preemptible instances that can be killed anytime).
Basically I have a very simple flow in pseudo code:
import dask.dataframe as ddf
# Suppose here I have a pandas dataframe df
...
daskdf = ddf.from_pandas(df, npartitions = 2000)
daskdf = daskdf.map_partitions(sample_function)
daskdf = daskdf.repartitions(npartitions = 500)
daskdf.to_parquet(s3_uri, compute = True)
Now it seem that the to_parquet
function creates two tasks: to_parquet
and store_to_parquet
. The final one ties everything together (as per comment in the code). But if I understood correctly workers will hold the results of to_parquet
computation in memory until all the 500 partitions are saved in memory.
This seems to be confirmed in the dashboard where I can see that results are held in memory.
But I have preemptible workers, so they are holding the result in memory for some time and may be deleted before I finish saving my entire parquet file. This means that my pipeline will save some file and âforgetâ about them and recompute an entire part of the graph which it has done before and saved already.
I found a possible resolution by replacing the last method call with
parquet_files = [
d.to_parquet(
os.path.join(path, f"part.{i}.parquet"),
storage_options=storage_options,
# write_index=False,
# compute=False,
)
for (i, d) in enumerate(df.to_delayed())
]
client: dask.distributed.Client = dask.distributed.get_client()
futures = client.compute(parquet_files)
for future in as_completed(futures):
# Release the results
future.result()
future.release()
It seems to work so far but I wanted to know if there is any downside to such approach ?
I can see that as I save the parquet file the entire portion of the completed graph is deleted from memory so maybe thatâs not a good thing. But since I am saving into memory I donât feel like this is a big issue.
Is there also any alternative ways of doing it that I could use so that I donât forget about those intermediate computations ?
Thank you for your help and advise in advance