I am using dask to read a large .csv file, calculate summary statistics and store it in postgres. Later, I intend to perform preprocessing steps and train machine learning models (dask_ml). All of this works fine on a local cluster inside jupyter notebook, however, when I setup dask clsuter inside of docker (my present usecase), the out-of-memory compute fails and all workers are lost. Any suggestions on how to setup dask inside docker will be really helpful.
Here is my docker-compose.yaml (few app details are hidden). I am expecting dask workers to spill out of memory data to /shared directory as follows,
version: "3"
services:
app:
container_name: "myapp"
build:
context: .
dockerfile: ./Dockerfile
---
volumes:
----
- shared:/shared/
restart: always
postgres_database:
--
mongo_database:
---
dask_scheduler:
container_name: "dask_scheduler"
image: myapp:latest
hostname: dask_scheduler
ports:
- "8786:8786"
- "8787:8787"
command: ["dask-scheduler"]
dask_worker_1:
container_name: "dask_worker_1"
image: myapp:latest
depends_on:
- dask_scheduler
volumes:
- shared:/shared/
command: ["dask-worker", "tcp://dask_scheduler:8786", "--memory-limit=1 GiB"]
dask_worker_2:
container_name: "dask_worker_2"
image: myapp:latest
depends_on:
- dask_scheduler
volumes:
- shared:/shared/
command: ["dask-worker", "tcp://dask_scheduler:8786", "--memory-limit=1 GiB"]
volumes:
pg_data:
mongo_data:
shared:
@SOUMYASHUKLA Welcome to discourse! Could you please share the error traceback as well? It’ll help us diagnose the issue.
@pavithraes Thank you very much! Here is the traceback;
app | file_path=== /shared/test4gb.csv summary_blocksize= 128000000.0
app | in summary for dask computation....
dask_worker_2 | distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
dask_worker_1 | distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
dask_worker_2 | distributed.nanny - INFO - Worker process 100 was killed by signal 15
dask_scheduler | distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.24.0.6:37543', name: tcp://172.24.0.6:37543, memory: 0, processing: 16>
dask_scheduler | distributed.core - INFO - Removing comms to tcp://172.24.0.6:37543
dask_worker_2 | distributed.nanny - WARNING - Restarting worker
dask_worker_1 | distributed.nanny - INFO - Worker process 100 was killed by signal 15
dask_scheduler | distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.24.0.5:34833', name: tcp://172.24.0.5:34833, memory: 0, processing: 32>
dask_scheduler | distributed.core - INFO - Removing comms to tcp://172.24.0.5:34833
dask_scheduler | distributed.scheduler - INFO - Lost all workers
dask_worker_1 | distributed.nanny - WARNING - Restarting worker
dask_worker_2 | distributed.worker - INFO - Start worker at: tcp://172.24.0.6:40681
dask_worker_2 | distributed.worker - INFO - Listening to: tcp://172.24.0.6:40681
dask_worker_2 | distributed.worker - INFO - dashboard at: 172.24.0.6:37165
dask_worker_2 | distributed.worker - INFO - Waiting to connect to: tcp://dask_scheduler:8786
dask_worker_2 | distributed.worker - INFO - -------------------------------------------------
dask_worker_2 | distributed.worker - INFO - Threads: 16
dask_worker_1 | distributed.worker - INFO - Start worker at: tcp://172.24.0.5:37363
dask_worker_2 | distributed.worker - INFO - Memory: 1.00 GiB
dask_worker_2 | distributed.worker - INFO - Local Directory: /src/app/dask-worker-space/worker-_464h4d5
dask_worker_1 | distributed.worker - INFO - Listening to: tcp://172.24.0.5:37363
dask_worker_2 | distributed.worker - INFO - -------------------------------------------------
dask_worker_1 | distributed.worker - INFO - dashboard at: 172.24.0.5:33659
dask_worker_1 | distributed.worker - INFO - Waiting to connect to: tcp://dask_scheduler:8786
dask_worker_1 | distributed.worker - INFO - -------------------------------------------------
dask_worker_1 | distributed.worker - INFO - Threads: 16
dask_worker_1 | distributed.worker - INFO - Memory: 1.00 GiB
dask_worker_1 | distributed.worker - INFO - Local Directory: /src/app/dask-worker-space/worker-0dfblw0s
dask_worker_1 | distributed.worker - INFO - -------------------------------------------------
dask_scheduler | distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.24.0.6:40681', name: tcp://172.24.0.6:40681, memory: 0, processing: 32>
dask_scheduler | distributed.scheduler - INFO - Starting worker compute stream, tcp://172.24.0.6:40681
dask_scheduler | distributed.core - INFO - Starting established connection
dask_worker_2 | distributed.worker - INFO - Registered to: tcp://dask_scheduler:8786
dask_worker_2 | distributed.worker - INFO - -------------------------------------------------
dask_worker_2 | distributed.core - INFO - Starting established connection
dask_scheduler | distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.24.0.5:37363', name: tcp://172.24.0.5:37363, memory: 0, processing: 0>
dask_scheduler | distributed.scheduler - INFO - Starting worker compute stream, tcp://172.24.0.5:37363
dask_scheduler | distributed.core - INFO - Starting established connection
dask_worker_1 | distributed.worker - INFO - Registered to: tcp://dask_scheduler:8786
dask_worker_1 | distributed.worker - INFO - -------------------------------------------------
dask_worker_1 | distributed.core - INFO - Starting established connection
dask_worker_2 | distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
dask_scheduler | distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.24.0.6:40681', name: tcp://172.24.0.6:40681, memory: 0, processing: 19>
dask_scheduler | distributed.core - INFO - Removing comms to tcp://172.24.0.6:40681
dask_worker_2 | distributed.nanny - INFO - Worker process 141 was killed by signal 15
dask_worker_2 | distributed.nanny - WARNING - Restarting worker
dask_worker_1 | distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
dask_worker_1 | distributed.nanny - INFO - Worker process 141 was killed by signal 15
dask_scheduler | distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://172.24.0.5:37363', name: tcp://172.24.0.5:37363, memory: 0, processing: 32>
dask_scheduler | distributed.core - INFO - Removing comms to tcp://172.24.0.5:37363
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 15) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 19) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 17) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 13) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 1) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 11) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 22) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 20) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Task ('read-csv-998b9577be828ad2ffd0a47a3929fe2e', 3) marked as failed because 3 workers died while trying to run it
dask_scheduler | distributed.scheduler - INFO - Lost all workers
dask_worker_1 | distributed.nanny - WARNING - Restarting worker
My cluster consist of 2 workers, 1 GiB memory each. I am working on a demo to show out of memory computation on a 4GB file on docker cluster running on my laptop.
step1: ddata = dd.read_csv(file_path, blocksize=summary_blocksize) #summary_blocksize= 128MB
step2: compute summary statistics of raw data
step3: store data in postgres
All of this works seamlessly on jupyter notebook with the same cluster setup.
I also tried the following to explicitly setup out-of-memory computation, without luck…
#set dask configurations to allow out-of-memory spill
dask.config.set({'worker.memory.target': False})
dask.config.set({'worker.memory.pause': False})
dask.config.set({'worker.memory.spill': 0.50})
dask.config.set({'worker.memory.terminate': False})
@SOUMYASHUKLA Thanks for the details! I think it was a good idea to set those config values.
I’m wondering if your partitions might be too big, have you tried reducing the blocksize
? Maybe to ~50MB or less?
You can also check the memory usage in the “Worker memory” plot in the dashboard – if you see orange/grey bars, it might mean the partitions are large
I’ll keep looking into this!
@pavithraes Thanks for helping out Looks like blocksize is a “magical” attribute.
Here is what i tried…
-
Replicated the setting in distribute.yaml file from jupyter notebook setup to docker env. (not sure if that worked)
-
Tweaked blocksize from 5MB to 128MB (max. size as per a blog I read) combinations.
The code works for 5 MB block size, with following rolling warnings:
dask_worker_1 | distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 729.55 MiB -- Worker memory limit: 1.00 GiB
dask_worker_2 | distributed.worker - WARNING - Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: 803.71 MiB -- Worker memory limit: 1.00 GiB
At some point my CPU utilization shoots up to 110% on both workers.
I tried setting a limit to it from docker-compose like I have set a limit on RAM, but appears that both of them cannot be set together
@SOUMYASHUKLA The ideal number of partitions depends on the memory available. So, even though using ~100MB chunks is a general rule of thumb, it doesn’t work well for small RAM capacities as in your case, I’m guessing that’s why your code works for small blocks.
May I ask what compute (summary stats) are you performing on the data? I think that may also be using memory resources
@pavithraes Got it! Thanks. I am calculating basic summary statistics such as mean, standard deviation, and others (all available as dask dataframe methods) after reading the large csv file. I was able to make this work by copying the updated distributed.yaml file inside docker container directly.
Interestingly, for my cluster (2 workers, 1GiB memory each), the code works for 1MB blocksize. I am also seeing that ideally for this cluster number of partitions should be greater 2000 for any data processing (reading csv or reading from sql) to work.
1 Like
@SOUMYASHUKLA Thanks for the details! And, I’m glad you got it working!
As a side note, I see this need-for-large-number-of-partitions while demonstrating Dask compute using default binder-hosted notebooks, which also offer ~2GB RAM overall.
1 Like