OutOfMemory, when merging multiple dataframes! Help me optimize!

Hey :wave: !

I have following problem: I want to join multiple DataFrames but I always run out of memory! My goal is to achieve an operation like polars.concat(how=‘diagonal’) or repeating outer joins.

I have simplified my code to this:

    trace_path = Path('x')
    dfs = []
    for span_path in trace_path.iterdir():
        df = dd.read_csv(span_path)
        df.repartition(npartitions=1)
        df.set_index('id', sorted=True)
        dfs.append(df)

    master_df = dfs[0]
    for df in dfs[1:]:
        intersecting_columns = list(set(master_df.columns).intersection(set(df.columns)))

        master_df = master_df.merge(df, how='outer', on=intersecting_columns)
        master_df = master_df.repartition(npartitions=10)


    master_df.dask.visualize(filename='high-test.svg', optimize_graph='True')
    master_df.visualize(filename='low-test.svg', optimize_graph='True')
    master_df.to_csv("dask-out-*.csv")

My high-level-graph looks like this:

Is there a better way of doing this or a resource friendlier way? Am I missing something?

I am really confused and kind of desperate but I am happy to join the Dask Community :grinning:
Btw. I have 16 Gb of RAM

Edit.:

It should be noted that my test-data currently is made of 10 files with around 10kB. I only have one index, it’s always the same. So every datapoint matches every datapoint, it grows exponentially. I basically need every combination. Is this too demanding/dumb?

Hi @devLeitner, welcome to Dask discourse!

Could you elaborate about that, or provide an example of dataset? I’m not sure I understand, but if you have a lot of rows with the same merging id, you might have a problem:

In some cases, you may see a MemoryError if the merge operation requires an internal shuffle, because shuffling places all rows that have the same index in the same partition. To avoid this error, make sure all rows with the same on-column value can fit on a single partition.

See the discussion here too: Memory Leakage on single worker on merged DataFrame (after task completion).

Yes, I have lots of (nearly every) rows with the same merging id and I found out that this is a problem haha.

I tried joining the datasets and the data grows exponentially. From 7 kB to 1.9 GB and then I can’t shuffle anymore. As you said.

I gave up on my mission and won’t join all these datasets anymore. Instead I calculate averages, max and min and work with these values.

But thank you for your answer!

1 Like