Memory limits reached in simple ETL-like data transformations

I’m trying to use Dask in a simple ETL-like scenario, from which I’ve extracted an exemplary code-snippet (pated below), using it in a local worker, mainly for experimental purposes. The task is such that it should be able to handle it with very little memory, but it keeps failing.

  • The input is loaded from CSV file, then made some processing (can be seen in the code), then stored back to another CSV file(s).
  • The input file is 200MB, and on other systems, this same process is doable with no more than 80MB RAM usage (it can go even low). With my test with Dask, even 800MB memory limit is hit and it dead-locks.
  • I’ve tried using map_partitions() - no big change.
  • I’ve tried setting single_file=False in to_csv() → it improved the memory usage, but not a lot (still it hangs on 500MB)

Any ideas what am I doing wrong, aside from the fact that this is not typical Dask usage. Thank you!

Here’s the “pipeline” code:

#!/usr/bin/env python

"""Dask based ETL pipeline
"""
import dask.dataframe as ddf
import dask as da
import dask.diagnostics as dd
from dask.distributed import Client, LocalCluster
import logging


def data_flow(df: ddf.DataFrame, output_names: list):
    # Path 1: Split and unite 
    split_dfs = [df.loc[df['id'] < 1000000000], df.loc[df['id'] > 2000000000]]

    united_df = ddf.concat(split_dfs,
                           interleave_partitions=True,
                           ignore_order=True)

    o1 = united_df.to_csv(output_names[0],
                          single_file=True,
                          index=False)
    
    # Path 2: Bloat the column
    col_large = df['large']
    df['large'] = col_large + col_large + col_large + col_large

    o2 = df.to_csv(output_names[1],
                   single_file=True,
                   index=False)
    
    return [o1, o2]


# Configuration
MAX_MEMORY = "800MB"
BATCH_SIZE = "8MB"
INPUT_FILE = "data/fake_data.csv"  # size: 200MB 
OUTPUT_FILES = ["data/out_united.csv", "data/out_bloated.csv"]
SCHEMA = {
    'id': int,
    'token': str,
    'small': str,
    'large': str}


if __name__ == "__main__":
    da.config.set({'distributed.worker.memory.spill': False})
    with da.config.set({'distributed.scheduler.worker-saturation': 1.1}):
        cluster = LocalCluster(processes=False,
                               n_workers=1,
                               memory_limit=MAX_MEMORY,
                               silence_logs=logging.ERROR)

    print(f"Dask version: {da.__version__}\nDashboard link: {cluster.dashboard_link}")
    with dd.Profiler() as prof, dd.ResourceProfiler(dt=.01) as rprof, Client(cluster):
        df = ddf.read_csv(INPUT_FILE,
                        dtype=SCHEMA,
                        sep=",",
                        blocksize=BATCH_SIZE)

        da.compute(data_flow(df, OUTPUT_FILES))
        
    dd.visualize([prof, rprof])

Hi @theJonan, welcome to Dask community.

First of all, using Dask Distributed is not cost free. A single Worker not doing anything is already taking 130MB of RAM on my machine. Dask enables you to do computations that are bigger than memory, but it is not meant to keep the memory footprint as lower as possible. 800MB memory limit is pretty low for a Worker.

Unfortunately, I cannot reproduce your code because it relies on external data. Do you have some code to generate this data?

Also, could you add the profiling somehow?

This still seems a bit weird. Where you able to identify some part of the code which would consume the memory, either by looking at the Dashboard or trying the different Paths in your code separately?

So if I understand correctly, with single_file=False, the code works and the Worker is using 500MB?

I agree that ideally, at least with the first Path, you should consume only the Worker process memory + a few number of blocks memory.

Hi, @guillaumeeb - thanks a lot for such quick and deep response!

First - I knew that there would be a memory overhead, but didn’t know the number - ≈130MB is a reference nice to know. Thanks!

Also - I’ve tried running with only path active and still the memory consumption is quite high. Obviously a big higher on the “bloating” path.

I’ll paste a lot of information down, hoping to give you as much context, as possible.

I’m not allowed to paste images (as a new user), so in order to provide some more profiling, but a successful run (no memory limit) have these statistics:

Time:13.04s | Max Mem: 1332.18MB | Max CPU: 342.40%
This result comes with this setting: 'distributed.scheduler.worker-saturation': 1.1

When I change it to 0.2, the result changes to:
Time:13.60s | Max Mem: 566.62MB | Max CPU: 202.60%

It is still quite high, and if I try to set a limit even at 600MB - sometimes it hangs. For me it is strange that it hangs at all - because it is (intentionally) quite linear processing. I’ve tried making it embarrassingly parallel with single_file = False (and it looks like this on the graph), but still - it hangs on lower limits.

This is the fake data generator, that I’ve made before:

#!/usr/bin/env python

"""A fake, random data generator
"""

import numpy as np
import argparse
import json
from tqdm import tqdm
import string

MAX_INT = 2 ** 32
LETTERS = list(string.ascii_letters + string.digits + " \t!.")
LINE_END = "\r\n"


def gen_line(data_info: dict, delimiter: str) -> str:
    line = None
    for info in data_info.values():
        min = info["min"] if "min" in info else 0
        max = info["max"] if "max" in info else MAX_INT
        val = np.random.randint(min, max + 1) if min < max else min
        if info["type"] == "int":
            val = str(val)
        elif info["type"] == "str":
            val = "".join(np.random.choice(LETTERS, size=val))

        if line is not None:
            line += delimiter
        else:
            line = ""
        line += val

    line += LINE_END
    return line


def generate_data(fname: str, max_rows: int, max_size: int, delimiter: str, data_info: dict, encoding='utf-8'):
    if max_size is not None:
        max_size *= 1024

    with open(fname, "wb") as f:
        line = (delimiter.join(data_info.keys()) + LINE_END).encode(encoding)
        total_size = len(line)
        f.write(line)
        f.flush()
        for i in tqdm(range(max_rows), total=max_rows):
            line = gen_line(data_info, delimiter).encode(encoding)
            total_size += len(line)
            if max_size is not None and total_size > max_size:
                break

            f.write(line)
            f.flush()
    pass


if __name__ == "__main__":
    argp = argparse.ArgumentParser(
        description="Fake data generator in CSV format.")
    argp.add_argument('-s', '--size', required=False, type=int,
                      help="Maximum size in KB to be generated.")
    argp.add_argument('-r', '--rows', required=False, type=int,
                      help="Maximum number of rows to be generated.")
    argp.add_argument('-d', '--delimiter', type=str, default=",",
                      help="The path to json configuration.")
    argp.add_argument('-c', '--columns', required=True, type=str,
                      help="The path to json configuration.")
    argp.add_argument('-o', '--output', required=True, type=str,
                      help="The output filename.")

    args = argp.parse_args()
    if args.size is None and args.rows is None:
        print(f"Err: Either Rows or Size must be specified!")
        exit(-1)

    data_info = None
    with open(args.columns.strip()) as cf:
        data_info = json.load(cf)

    generate_data(args.output.strip(), args.rows, args.size,
                  data_info=data_info,
                  delimiter=args.delimiter)

This is the JSON configuration for this generator:

{
    "id": {
        "type": "int"
    },
    "token": {
        "type": "str",
        "min": 32,
        "max": 32
    },
    "small": {
        "type": "str",
        "min": 10,
        "max": 128
    },
    "large": {
        "type": "str",
        "min": 10,
        "max": 40000
    }
}

And this is how I’ve invoked the generator to produce 200MB of fake data:

$ ./gen_fake_data.py -r 10000 -c fake_data_info.json -o fake_data.csv -s 200000

It did become quite big “post”, but I hope it provides all the necessary information.
Thank you!

I have a follow-up questions, which probably better be placed in Dask Distributed section - please, let me know and I’ll post the question there.

I think it boils down to “How the distributed Client is handling the memory limit?”:

  1. It shouldn’t lead to a dead-lock because the process is quite linear, and memory allocation can wait (and don’t block any prior operation) until some other block is freed.
  2. The current behaviour could be a symptom of some global lock (common to allocation and freeing?!) which can result in such dead-lock.
  3. I’ve tried the Dask distributed facilities with other libraries (like PyArrow and Polars) with da.delayed and it does limit the memory consumption, but again - with a dead-lock.

Btw, just as an informative feedback - with proper batch reading and writing (employing da.delayed, da.persist, etc.) for PyArrow and Polars, the statistics for a similar, 250MB input CSV file are:

  • PyArrow: Time:0.24s | Max Mem: 489.50MB | Max CPU: 124.10%
  • Polars: Time:0.65s | Max Mem: 962.07MB | Max CPU: 189.60%
  • Dask: Time:14.53s | Max Mem: 515.47MB | Max CPU: 181.70%
  • Pandas: Time:10.16s | Max Mem: 2471.66MB | Max CPU: 108.90%

Please, let me know if it is a good idea to post this as a separate question (perhaps without the stats) on the Distributed section. Thanks!

First, thanks for the complete reproducer, I was able to run your code and reproduce the behavior you describe!

Some observations or remarks:

  • When I start the LocalCluster and open the Dashboard, I actually see that the Worker is already consuming 256MiB out of 763 available.
  • Running the first path of your code make the occupied memory go up to 612MiB. And the memory is staying occupied after the process as unmanaged memory, I’m not sure why yet.
  • But re running the code does activate memory cleaning and works fine.
  • I’m able to make the code run without staling with 800MB Worker memory by adding some other settings in the configuration (otherwise it stalled with single_file=True:
da.config.set({'distributed.worker.memory.pause': 0.95})
da.config.set({'distributed.worker.memory.target': 0.9})
  • Memory consumption looks like this:
  • But somehow, it has to load all the data into memory before writing in this case, no streaming possible.
  • With single_file=False, Dask streams the writing, but somehow keep partitions into memory, I’m not sure why, so it stalls if lowering the Memory limit. This shouldn’t be related to your workflow as the generated graph indeed looks embarrassingly parallel.

I think it is more on Scheduler and Worker part. Tasks are just paused when reaching a certain percentage of memory limit.

I agree with you, but this is not the case. Memory taken by previously read blocks stays occupied for some reason.

See my screen copy of the Dashboard above. No more blocks to write, no more memory, tasks are pause, impossible to read more data.

If you have some questions on these results, yes please open a new Topic.

Thank you so much for this deep investigation!

I’ll repost parts of the main question in the Distributed channel, cause it seems to belong there.

Regarding the configuration changes:

  • As far as I know distributed.worker.memory.target configuration is related to spilling process, which I’ve intentionally disabled (da.config.set({'distributed.worker.memory.spill': False})). My goal for the entire experiment is to see how Dask manages the in-memory stream of data.
  • The other parameter: da.config.set({'distributed.worker.memory.pause': 0.95}) is a nice catch, although I suspect it just postpones the stall - doesn’t remove the problem.

I was also wondering whether the problem could be of unfreed memory, that’s why I suspected some memory management global lock.

Thanks again, I’ll post an update here, if I find a resolution on the issue.

(I’ve posted the question to Distributed channel: Worker blocking on memory limit, despite the streaming-friendly pipeline process)

@theJonan I notice you’re using interleave_partitions=True. There are issues with the implementation of that which can cause all the input data to be loaded into memory at once:

You also may want to pass compute=False to to_csv. By default, to_csv will block and write the CSV when called. The way your code looks, it seems you’re expecting that to just return the delayed value, which you compute later.

1 Like

Thanks @gjoseph92 for pointing that out!

@theJonan you are right!

Clearly, Dask here doesn’t free the already processed blocks.

Thanks @gjoseph92 for the investigation!

I’ve tried both suggestions, but they don’t seem to improve the situation. However, I’m curious to understand why they are expected to.

Regarding to_csv(compute=False) - isn’t that postponing the actual freeing of the memory? My logic is that it is better to have the computation immediately (i.e. compute=True), because that would enable storing the data, and freeing the memory. The fact that I return something is more of me being new to Dask and using that to trigger the execution with da.compute().

The interleave_partitions=True is parameter that I don’t fully understand, but my idea was to make it as easy, as possible to dd.concat() - put the incoming dataframes/chunks as they arrive. The implications come with the index, I guess.

I was checking the code - and the problem seems pretty clear. When the memory limit is reached the whole worker is paused (or killed in the case of processes=True setup), along with all its threads. That blocks the possibility of leaving some of the threads running so they can eventually free the memory.

The distributed.scheduler.worker-saturation configuration option should be relevant to such situations, but I guess it performs better when there are many workers. I didn’t see any per-thread level code, regarding that scenario.

What @gjoseph92 is saying is that yo don’t need da.compute if you’ve already called to_csv. But looking at your code, it looks like you want your method to return delayed object to be computed later on. Ence the to_csv(compute=False). It’s not postponing the freeing, it’s postponing the entire computation.

Yes, you are right. But it should probably not reach this level of memory usage.

In other threads, we’ve been discussing growth of unmanaged memory. Did you tried the tips in these links:

https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os

Yes, I did check these documents and read them several times. Once more now.

I see I’ve missed mentioning that I’m developing on macOS, although I’m making test runs on Linux (both directly and in docker), with similar results. Mentioning now, if it brings some information.

Indeed the unmanaged memory is quite a lot - for most of the time the process is running. I was not able to bring it down with the suggested approaches - both on macOS and Linux.

As I’ve mentioned earlier - it bothers me that I cannot see (in Dask codebase) a mechanism for postponing only certain tasks, in order to “save” memory. If that is the case, it is not possible to make it work the way I want it.

Or, it is quite possible that I do something in a wrong way. So, if I can reformulate the question:

Is it possible to make Dask process batches one, by one, in order to save memory? if the actual transformations allow that, of course.

Well, this mechanism has been added lately, and you are using it, see:
https://distributed.dask.org/en/stable/scheduling-policies.html#avoid-over-saturating-workers

I tried again your code using automatic memory trimming, and I was able to make it run with a 512MiB memory limit and one worker.

See the profiling below:

It also makes the worker less memory consuming in the first place, only 130MiB after start. I also launched the cluster with this code:

cluster = LocalCluster(processes=True, # notice this line
                           n_workers=1,
                           memory_limit=MAX_MEMORY,
                           silence_logs=logging.ERROR)

Limiting to 4 threads per worker only (threads_per_worker=4), and lowering the worker saturation (da.config.set({'distributed.scheduler.worker-saturation': 1.0})), I was able to go down to 350MiB. Then with 4MiB blocks, down to 300MiB.

You could maybe still optimize things for memory efficiency, but this is also a disadvantage for performances (especially using lower block sizes).

In the end, there is still a bit of unmanaged memory left after the computation is done.

I’ve tried with much lower values of saturation, actually - .2, .4. It did make a difference, but didn’t manage to bring it as low as 300MB.

I did manage to bring it around 512MB, but if I put a memory limit at 500MB, or even 600MB, it will most probably hang, regardless of the fact that on the previous runs (without a limit) it showed less consumption. Btw, on your graphic I see a peak consumption of ≈1GB, is that correct?

Setting process=True on my side actually makes the memory consumption some ≈150MB higher (understood), and the only difference is when a memory limit is imposed - the task doesn’t stall, but get killed.

Yes, having less threads per worker lowers the memory consumption, as expected. But we’re quite far from possible values - this type of processing should comfortably go with around 100MB. For example, I’ve made the same pipeline in SSIS, and the process runs with a peak consumption of ≈80MB. Much slower, though.

As I’ve said - this is an experiment for me, so it doesn’t stop some real world problem (luckily!). I was interested of Dask’s capabilities of juggling with memory when the resources are limited, and it seems to me that it is more of a proper tuning and setup of the whole cluster (from the developer), having the prior knowledge of the volumes of data to be processed. As we’ve discussed - it should not hang on a task that is (realistically) processable in a linear manner.

Thanks a lot for your efforts! I’ll post here any updates, if I manage to make it work the way I’m looking for.

You are right, not sure why. I just re run things with 300MiB memory limit, here is the graphic generated:

But on the Dashbaord, it did never go over 300MiB, so not sure why the memory scale is like that.

Hm, I just looked at the Dashboard, did you ue some external tool?

Well I guess you’ll never get that with Dask.

You’re welcome, please do so!