Hello!
I’m Jakub and I want to use Dask to calculate statistical distances/divergences on data larger than local memory. I wanted to start with creating simple vector of values (as dataframe with single column) and calculating mean on it. Unfortunately, it throws out of memory error. The full code to recreate the issue:
1. Dockerfile:
# syntax=docker/dockerfile:1
FROM ubuntu:22.04
RUN apt-get update && apt-get install -y python3.9 \
python3-pip git
RUN mkdir /experiments
WORKDIR /experiments
RUN pip3 install --upgrade pip
RUN pip3 install jupyter pandas==1.5.2 numpy==1.24.0 \
dask==2022.12.1 dask[distributed]==2022.12.1 \
pyarrow==10.0.1 fastparquet==2022.12.0 graphviz==0.20.1
EXPOSE 8888
RUN useradd -m jakubb --uid 1000
CMD ["jupyter", "notebook","--allow-root", "--ip=0.0.0.0"]
2. Building the docker with:
docker build -f Dockerfile . -t 'dask:1.0'
3. Starting the docker with (notice memory limit):
docker run -p 8888:8888 -v ${PWD}:/experiments/ -m 1g dask:1.0
4.Opening a jupyter notebook and running an experiment, 1st approach:
import numpy as np
import pandas as pd
import dask.dataframe as dd
# filenames that will be created
files = [f'vec_{i}.pq' for i in range(10)]
# create data in a loop that together is larger than available memory
# one is about 230 mb
for file in files:
pd.DataFrame({'a':np.random.normal(0,1, int(3e7))}).to_parquet(file)
# read it with dask
df = dd.read_parquet(files)
# kills worker due to memory
df.a.mean().compute()
5.Running an experiment, 2nd approach:
import numpy as np
import pandas as pd
import dask.dataframe as dd
# filenames that will be created
files = [f'vec_{i}.pq' for i in range(10)]
# create data in a loop that together is larger than available memory
# one is about 230 mb
for file in files:
pd.DataFrame({'a':np.random.normal(0,1, int(3e7))}).to_parquet(file)
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
n_workers=1,
threads_per_worker=1,
)
client = Client(cluster)
# read it with dask
df = dd.read_parquet(files)
# kills kernel due to memory
df.a.mean().compute()
outputs:
KilledWorker: Attempted to run task ('getitem-a1b6278da8c3568b5657f5146eacf39d', 1) on 3 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:36799. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.
What am I doing wrong? If I work on partitions in a loop directly (for partition in df.partitions: partition.mean().compute()
) I can calculate the mean of each partition and then the mean of means, but I would expect Dask doing this for me optimizing memory and cpu usage. Is there a way to do this properly with Dask?