Writing very large dataframes with a sorted index

Hi folks,
I’m confused about how to create, set index and write out very large dataframes.

My current work-flow is like this:

def create_df(file): 
    ....
    return my_pandas_df

futures = []
for file in files: 
    futures.append(client.submit(create_df, file))

ddf = dask.dataframe.from_delayed(futures)
ddf = ddf.set_index(other='my_index_column', drop=True, sorted=False)
ddf.to_parquet(path='data_dir.parquet', compute=True)

here is a snapshopt of my task graph using a fraction of the data:

Question:
This shows that the created dataframe is stored in memory before it’s written out. This clearly won’t scale when I use all the data (I’ve tried it as well and it just crashed). How do I make this workflow scale to larger amounts of data?

Note:

  1. i know to limit the size of the computation graph and I can (and will do this).
  2. I need the index sorted (because I want to do joins etc. later potentially).
  3. I’d also really like to understand what Dask is doing memory wise under the hood (so any pointers in that direction would be great).

Thanks in advance.

Hi @RobertArbon,

First I’m wondering something: as you use Future to create your initial chunks of dataframe, Dask might not be able to stream the data. What I mean is: you are submitting the loading of all dataframe pieces at one as Future, never releasing the results. So I guess Dask just executes all of these and store it in memory.

I think you should use delayed for this kind of work, something like:

from dask import delayed

delayed_df = []
for file in files: 
    delayed_df.append(delayed(create_df)(file))
ddf = dask.dataframe.from_delayed(delayed_df)

This way, Dask should be able to choose when computing the delayed objects.

The second thing is that, as you’re probably aware of, set_index is very expensive on a distributed system. You can find some tips on Dask documentation on how to optimize it if it’s the bottleneck.

Thanks for the reply!

  • The reason I’m using the Futures api is that I need to scatter an object to the workers executing create_df (I didn’t show this for clarity) as it’s can be quite large e.g., a deep learning model or some large reference data.
  • I think you’re correct - my code as written does appear to store everything in memory (the Bytes stored just increases to 2X the size of the final dataframe).

I guess my question is actually two questions.

  1. How do I create and write out a large (> memory) dataframe while scattering data to each worker.
  2. when do I set the index? Before or after writing it to disk? I guess there’s no way to sort_index on the fly, so if the dataframe is larger than memory it needs to be written to disk first but I’m not sure.

Could you show an example of what you are doing? I’m not sure yet why it forces you to use the Futures API.

If we want to be sure, you could just test the code without the set_index call, and see how it goes. It might work a little better though, as set_index might act as a kind of barrier here.

I think I need more details about the scatter thing to give an appropriate answer. I guess this is not the scattered data you’re trying to write?

It would be great not to use futures tbh, I find it all a bit confusing…but this is my code (without extraneous detail here):

Essentially I have 10 - 100k files which I want to featurize. The features I want to create might use a deep learning model or some large reference data (20MB - 2000MB). I create those features using a worker class:

class Worker():
    self.model = a_large_model 

    def transform(paths_to_files): 
        all_results = []
        for path in paths_to_files: 
            file = load(path)
            all_results.append(self.model.transform(file))
        return pd.DataFrame(all_results)

the analysis is performed in this function by scattering the worker to each process, sending a small batch of paths to the worker transform method and getting a dataframe back (repeat until all files exhausted), create master dataframe, set index, write to disk.

def run_analysis(...): 
       compute = Client(n_processes=10)
 
       worker_future = compute.scatter(worker, broadcast=True)

        results = []
        for batch in batches_of_files:   # create little batches of file_paths so compute graph stays small
            features_future = compute.submit(_process_batch,
                                             worker_future, batch,
                                             compute.resource_config.chunk_size)
            results.append(features_future)

        # Create the new dataframe and set the index
        ddf = dd.from_delayed(results)
        ddf = ddf.set_index(other='index', drop=True, sorted=False)
        assert ddf.known_divisions, "divisions must be known in new dataframe"
        ...   # optionally some joins and merges to other dataframes. 
        ddf.to_parquet(path=str(dataframe_path), compute=True)

Where _process_batch :

    def _process_batch(worker: BatchWorker, batch_of_file_paths) -> pd.DataFrame:
        return worker.transform(batch_of_file_paths)

Thanks for giving some code. I’m not sure, but I think your solution is over complicated, but I may have missed something.

I built some toy example to illustrate a simple case:

import dask
from dask.distributed import Client
import numpy as np
import sys
import pandas as pd
import dask.dataframe as dd

client = Client(n_workers=4)
print(client)

big_array = np.random.rand(5 * 1024, 5 * 1024)
print(sys. getsizeof(big_array)) #about 200MB

delayed_array = dask.delayed(big_array)
#could use scattered_array = client.scatter(big_array, broadcast = True)

def my_super_func(array, i):
    return pd.DataFrame(index=pd.Index([i]), data=[np.mean(i * array)])

lazy_results = []
for i in range(100):
    lazy_results.append(dask.delayed(my_super_func)(delayed_array, i))

ddf = dd.from_delayed(lazy_results)
ddf.compute()

I hope this makes sense regarding your need. If I try to adapt your code, I would do something like this:

def transform(paths_to_files, model): 
        all_results = []
        for path in paths_to_files: 
            file = load(path)
            all_results.append(model.transform(file))
        return pd.DataFrame(all_results)

client = Client(n_workers=10)
delayed_model = dask.delayed(a_large_model)

results = []
for batch in batches_of_files:   # create little batches of file_paths so compute graph stays small
    features_delayed = dask.delayed(transform)(batch, delayed_model)
    results.append(features_delayed)

ddf = dd.from_delayed(results)

Does that makes sense? You might even be able to remove a for loop by using bags instead of creating your batches of files, but that’s another subject and probably not needed.

Wow, thanks for that great reply. That does make sense.

The question then becomes, if ddf is larger than memory then what’s the best way of writing it out with a ordered index?

If I try to set the index before writing it out, presumably dask would need access to the whole dataframe? i.e., this wouldn’t work:

...
ddf = dd.from_delayed(lazy_results)
# won't work, ddf too large for memory. 
ddf = ddf.set_index(other='index', drop=True, sorted=False)
ddf.to_parquet(path='data_dir.parquet', compute=True)

Whereas this would work but has an extra write action:

...

ddf = dd.from_delayed(lazy_results)
# triggers compute of the dataframe. 
ddf.to_parquet(path='data_dir.parquet', compute=True)

# set index has access to whole dataframe now that it's written out. 
ddf = dd.read_parquet(path='data_dir.parquet')
ddf = ddf.set_index(other='index', drop=True, sorted=False)
ddf.to_parquet(path='data_dir.parquet', compute=True)

do you have any advice or thoughts?

You have some answer to this in the set_index documentation:

https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.set_index.html

Documentation says it needs to pass over the data twice. This doesn’t mean it will load all the data in memory at the same time. So in theory, even the first code you propose should work.

However, this is probably much safer to write it to disk once before setting the index, this will avoid re-computation of the data and potential spill to disk. And there are probably some optimization with Parquet format that will be used by Dask.

In the end, I would advise you to just test, first at a smaller scale that your whole dataset, and see what happens :wink: !

1 Like

OK, point taken. Thanks very much for your help. It’s good to know I’m not missing something really obvious (beyond what you’ve already pointed out about the delayed functions and scattering).

Thanks for your time and help.

1 Like