Cannot calculate simple .mean() on dask.dataframe larger than RAM

Hello!

I’m Jakub and I want to use Dask to calculate statistical distances/divergences on data larger than local memory. I wanted to start with creating simple vector of values (as dataframe with single column) and calculating mean on it. Unfortunately, it throws out of memory error. The full code to recreate the issue:

1. Dockerfile:

# syntax=docker/dockerfile:1
FROM ubuntu:22.04

RUN apt-get update && apt-get install -y python3.9 \
    python3-pip git
	

RUN mkdir /experiments
WORKDIR /experiments

RUN pip3 install --upgrade pip 
RUN pip3 install jupyter pandas==1.5.2 numpy==1.24.0 \
    dask==2022.12.1 dask[distributed]==2022.12.1 \
	pyarrow==10.0.1 fastparquet==2022.12.0 graphviz==0.20.1

EXPOSE 8888

RUN useradd -m jakubb --uid 1000

CMD ["jupyter", "notebook","--allow-root", "--ip=0.0.0.0"]

2. Building the docker with:

docker build -f Dockerfile . -t 'dask:1.0'

3. Starting the docker with (notice memory limit):

docker run -p 8888:8888 -v ${PWD}:/experiments/ -m 1g dask:1.0

4.Opening a jupyter notebook and running an experiment, 1st approach:

import numpy as np
import pandas as pd
import dask.dataframe as dd

# filenames that will be created
files = [f'vec_{i}.pq' for i in range(10)]

# create data in a loop that together is larger than available memory
# one is about 230 mb
for file in files:
    pd.DataFrame({'a':np.random.normal(0,1, int(3e7))}).to_parquet(file) 


# read it with dask
df = dd.read_parquet(files)

# kills worker due to memory
df.a.mean().compute()

5.Running an experiment, 2nd approach:

import numpy as np
import pandas as pd
import dask.dataframe as dd

# filenames that will be created
files = [f'vec_{i}.pq' for i in range(10)]

# create data in a loop that together is larger than available memory
# one is about 230 mb
for file in files:
    pd.DataFrame({'a':np.random.normal(0,1, int(3e7))}).to_parquet(file) 


from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=1,
    threads_per_worker=1,
)
client = Client(cluster)

# read it with dask
df = dd.read_parquet(files)

# kills kernel due to memory
df.a.mean().compute()

outputs:

KilledWorker: Attempted to run task ('getitem-a1b6278da8c3568b5657f5146eacf39d', 1) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:36799. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.

What am I doing wrong? If I work on partitions in a loop directly (for partition in df.partitions: partition.mean().compute()) I can calculate the mean of each partition and then the mean of means, but I would expect Dask doing this for me optimizing memory and cpu usage. Is there a way to do this properly with Dask?

1 Like

It seems that what made the Dask work bad here is the scale… or rather lack of it. So in the case described I have limited docker container RAM to 1GB. I have later read the docs on memory management and some of the functions are time-based e.g. Dask checks the memory used by the process every 200ms. If I have 1GB RAM and read files which are 100s of MBs each with one or several processes, then I exceed allowable memory before Dask realizes it happens. For example - if the limit is 90% then one more partition read and we jump from 90 to over a 100. So I tested it in more real scenario - 8gb RAM, 90GB vector (!) and it runs smoothly keeping RAM at around 4GB with single worker. Can anyone confirm my theory or give another explanation?

Hi @rabitwhte,

You’re probably right about your experiment. Dask by default is eager in reading tasks, so the memory error might come from the fact that Dask runs too many data loading tasks before starting to compute the mean on them.

You could see an explanation of this in Reducing memory usage in Dask workloads by 80% and try the newly introduced mechanism to limit memory usage!