No Space left on device error when merging

Hi all,

I have two very large databases called Network (800MB) and SecondOrder (33GB) and wanna perform a series of merges like this:

NetworkDD = dd.from_pandas(Network, npartitions=Network['NiuSup'].nunique())

NodesSharingSupplier = dd.merge(NetworkDD, NetworkDD, on='NiuSup').query('NiuCust_x != NiuCust_y')

### WORKS UNTIL HERE (BUT DOES NOT SAVE RESULTS IN CSV)

NodesSharingSupplier=NodesSharingSupplier.drop('NiuSup', axis=1)
NodesSharingSupplier=NodesSharingSupplier.drop_duplicates()


NodesSharingSupplier=NodesSharingSupplier.rename(columns={"NiuCust_x": "NiuSup", "NiuCust_y": "NiuCust"})

NodesSharingSupplier.to_csv("NodesSharingSupplier.csv")

NodesSharingSupplier=NodesSharingSupplier.drop('NiuSup', axis=1)
NodesSharingSupplier=NodesSharingSupplier.drop_duplicates()

NodesSharingSupplier=NodesSharingSupplier.rename(columns={"NiuCust_x": "NiuSup", "NiuCust_y": "NiuCust"})

SecondOrder=pd.read_csv("/home/francesco.serti/SecondOrder_line55.csv")
SecondOrderDD = dd.from_pandas(SecondOrder, npartitions=SecondOrder['NiuSup'].nunique())

SecondOrderDD_all = SecondOrderDD.merge(NodesSharingSupplier, on=['NiuCust','NiuSup'], how='left', indicator=True)
SecondOrderDD=SecondOrderDD_all.loc[SecondOrderDD_all._merge=='left_only',SecondOrderDD_all.columns!='_merge']

del SecondOrder
del NodesSharingSupplier

##HERE IF I DO .compute() an OS Error: No Space left on Device
SecondOrderDD.to_csv('/home/francesco.serti//SecondOrderDD_line75.csv', single_file=True)

I know that Dask operates in a lazy mode. I tried both to make all the computations in a lazy mode and then save it with .to_csv and to perform the .compute() immediately but in any case (maybe due to temp files generation?) I get a No Space left on Device error.
Is there a fix to this (e.g. force dask not to generate temp)? I even performed it on a server and the same error occurred!

Hi @fede, welcome to Dask community!

A No Space lest on Device error is usually a symptom of Dask spilling too much data to disk when performing some query involving a Shuffle. Dask tries to make some room into memory by spilling data to disk and processing new partitions/rows. This can occur if for some reason during your merge some really large partitions are generated.

It’s a little hard to tell where it is coming from by just looking at your code with no sense of how the data looks like. but typically merge and drop_duplicates can generate a lot of temporary data depending on your input datasets.

I would recommend to go step by step here and whatch the computation using Dask Dashboard. I see that you’ve got a comment early on in your code saying

Does it means the two first lines cannot be executed by saving the results to a file?

Also, calling compute on the result or writing results as a single CSV file like you are trying to do means loading all the data in one place.

You can disable spilling using Dask configuration, but I don’t think this will fix your problem, you’ll most likely get a MemoryError if you do this. How large is your /tmp partition?

Increase available space: Make sure that you have enough free space on your device or server to perform operations. If disk space is depleted, you may need to delete unnecessary files or expand free space.
Use a temporary directory: You can configure Dask to use another directory for temporary files that has more available space. To do this, set the TMPDIR or TEMPDIR environment variable to point to a path with more free space before you run your program.
Example for Linux/Mac:
export TMPDIR=/path/to/large/temp/directory
Example for Windows:
set TMPDIR=C:\path\to\large\temp\directory
Use compute=False: When you call .to_csv(), set compute=False to delay the computation until the end point. Then you can call .compute() only after that, when you are sure there is enough free space to save the result.
Example:
SecondOrderDD.to_csv(‘/home/francesco.serti/SecondOrderDD_line75.csv’, single_file=True, compute=False)
SecondOrderDD.compute()
Split the data into smaller chunks: If your operation is too large, you can try splitting the data into smaller chunks and performing the operation in parts. This can help reduce the use of time space.
Example:
econdOrderDD_parts = [SecondOrderDD_part.to_csv(f’part_{i}.csv’) for i, SecondOrderDD_part in enumerate(SecondOrderDD.parts)]
You can then process each part separately and merge the results as needed.
I hope these suggestions help you deal with the problem of space constraints when using Dask. Good luck! If you have any additional.

1 Like

Thank you both for the suggestions. @ChurchillBell could you please elaborate more on the division by “chucks” strategy? How can I perform a merge for a dataset split into chunks for instance?

Moreover, concerning the tempdir, is there a way to direct it to a server directly? If so, should I do it on terminal or via jupyter?

Thank you againn