Memory issues arising from writing partitions with to_parquet

Hello everyone,

We are developing an open-source framework and try to provide different components for data
pipelines. We rely on dask as a backbone for efficiently managing large datasets.
A lot of operations involving pandas dataframes, which we parallelize using dask partitions via map_partitions.
We try to store results in separate parquet files, one for each column.

Basically, our approach looks like the following:


dataframe = dataframe.map_partitions(process_partition)

index_df = dataframe[["id"]]
column_df = dataframe[[column]]

write_tasks = [
    dd.to_parquet(index_df, output_path, compute=False),
    dd.to_parquet(column_df, output_path, compute=False)


We have encountered an issue by doing this: the final to_parquet step retains the entire dataset in
memory until the last partition was computed,
causing out-of-memory problems due to substantial memory allocation.

We are uncertain whether there might be a more optimal approach for handling this task and
curious if there is a recommended way for computing the results of one partition
sequentially, saving them directly to parquet files, and promptly releasing the memory?

Thank you in advance, and your assistance is greatly appreciated. :slight_smile:

Hi @mrchtr, welcome to Dask community!

I’m a bit surprised by this, this should not be the case! In Improving pipeline resilience when using `to_parquet` and preemptible workers, we identified that the tasks results from to_parquet were kept in memory until the last write, but this is only the metadata needed for the final step, not the data to write. The dataset should be cleared from memory chunk by chunk when the writes occur.

Maybe there’s a catch in this workflow I’m not seeing currently. Could you try to built a minimal reproducer?

Thank you for assisting me with this. I have created a minimal example that should help to reproduce it.

What it does is create a dummy dataframe with large string values per partition, allowing you to experiment with different dataframe sizes.

Furthermore, I have limited the LocalClient to a size of 1 worker, 1 thread, and 2GB of memory per worker. So, my assumption was that if I were to create partition sizes <2GB, map_partition would handle them sequentially.

Unfortunately, we run into an out-of-memory issue because the first partition is computed, then the second one, and finally, everything is written to disk.

If you change the memory_limit_per_worker to 4GB in this example, the out-of-memory issue will not occur since both partitions will fit into memory.

The artificial delays and loops are primarily for simulating longer-running workloads. This makes debugging easier as well.



Source code:

import pandas as pd
from dask.distributed import LocalCluster, performance_report
import dask.dataframe as dd
from distributed import Client

# Arguments
output_dir = './tmp'
num_workers = 1
thread_per_worker = 1
memory_limit_per_worker = 2e9 # 2Gb
num_thread_saving_parquets = 1

# Helper functions
def artificial_delay(milliseconds):
    end_time = time.time() + milliseconds / 1000
    while time.time() < end_time:
        _ = 2 + 2  # Perform a simple computation

def process_partition(df, gb_to_produce = 1):
    """Dummy method for processing partition. Large objects are created to simulate oom."""
    print("process partition")
    artificial_delay(10000) # Just needed to gain extra time for debugging
    counter = 0
    large_string = "t" * ((1024 * 1024 * 128)) # ~130Mb
    to_append = []

    while counter < (10 * gb_to_produce) :
        counter += 1
        to_append.append([counter, large_string])

    return pd.DataFrame(data=to_append, columns=["id", "column_a"])

if __name__ == "__main__":
    worker_kwargs = {
        "n_workers": num_workers,
        "memory_limit": memory_limit_per_worker,
        "threads_per_worker": thread_per_worker

    cluster = LocalCluster(**worker_kwargs)
    client = Client(cluster)

    with performance_report(filename="export.html"):

        dataframe = dd.from_dict({"id": [0, 1], "column_a": ["a", "b"]}, npartitions=2)

        dataframe = dataframe.map_partitions(process_partition, meta=dataframe)

        index_df = dataframe[["id"]]
        column_df = dataframe[["column_a"]]

        write_tasks = [
            dd.to_parquet(index_df, output_dir, compute=False),
            dd.to_parquet(column_df, output_dir, compute=False)


In the meantime, I have also considered a potential workaround. We could invoke dataframe.persist() and then iterate through the list of futures. If a future has completed, we can immediately write it to disk. This approach appears to work well, even with larger setups.

Pseudo code:

dataframe = dataframe.persist()
futures_list = futures_of(dataframe)

future_to_partition = {}
for partition_idx, future in enumerate(futures_list):
    future_to_partition[future] = partition_idx

while futures_list:

      completed, not_completed = wait(futures_list, return_when="FIRST_COMPLETED")

      for future in completed:
             partition_idx = future_to_partition[future]
             pandas_df = future.result()

Please let me know if anything is unclear or omitted. Thank you very much! :slight_smile:

Hi @mrchtr,

the final to_parquet step retains the entire dataset in memory until the last partition was computed

This is not correct; it retains up to 2 partitions in memory.

What you’re observing is called worker-side queueing.
By default, the scheduler will eagerly push more tasks to the worker than there are threads available. This way, as soon as a task is finished, the worker can immediately get busy on something else, without waiting for the round-trip to the scheduler to get instructions for the next step.
This has the downside that, given an embarassingly parallel computation, a worker with N threads will be busy with up to N+1 pipelines.

To disable this optimization, and thus reduce memory usage at the expense of increased latency, use

dask.config.set({"distributed.scheduler.worker-saturation": 1})

You can observe this behaviour if you look at http://localhost:8787/graph.
On http://localhost:8787/ you will also see how there’s always 2 pipelines going on with the default worker-saturation of 1.1, and only 1 with worker-saturation=1.

In your case, however, the issue is that your code, applied to each parttion, has a peak memory usage of 3.5 GiB. So no matter what it will kill off a worker mounting only 2 GiB. The metrics show how woker-saturation doesn’t impact much:

In the code below I increased the number of partitions to 10:

import dask.config
from distributed.diagnostics import MemorySampler

ms = MemorySampler()
dask.config.set({"distributed.scheduler.worker-saturation": 1})
with Client(n_workers=1, memory_limit="8 GiB", threads_per_worker=1):
    with ms.sample("worker-saturation=1", interval=0.1):
            dd.to_parquet(index_df, output_dir, compute=False),
            dd.to_parquet(column_df, output_dir, compute=False)

dask.config.set({"distributed.scheduler.worker-saturation": 1.1})  # default
with Client(n_workers=1, memory_limit="8 GiB", threads_per_worker=1):
    with ms.sample("worker-saturation=1.1", interval=0.1):
            dd.to_parquet(index_df, output_dir, compute=False),
            dd.to_parquet(column_df, output_dir, compute=False)



1 Like

Thank @crusaderky for the detailed explanation! This has already improved my understanding of my issue.

There is indeed something in my code which leads to a memory peak.

Now, the question is where the source of this memory peak lies. In this scenario, I assume it may be related to the writing of column containing large string values into parquet files.

Are there common best practices for handling columns containing large string values when writing to parquet files? Do you have any tips on how I can reduce this memory peak?

simplest way to reduce heap use is to reduce partition size. If you had twice as many partitions with half as many rows each, your heap consumption would be halved.