Only one worker out of seven carry out the workload

Hello folks,

I’m running Dask on K8s, in this case I have 7 workers with 4G RAM availability each.
Taking a look at the dashboard I notice that one of them seems to carry all the job. I was expecting the workload to be split among all the workers but this doesn’t seem to happen.

Does anyone know the reason behind this behaviour?
Thanks in advance,

Leonardo

Below you’ll see the source code.

import time
import os
import dask.config
from time import sleep
import dask.dataframe as dd
from dask_sql import Context
from dask_kubernetes.operator import KubeCluster
from distributed import Client


if __name__ == '__main__':
    cluster = KubeCluster(name="test",
                          image='ghcr.io/dask/dask:latest',
                          n_workers=7,
                          env={"EXTRA_PIP_PACKAGES": "pyarrow fastparquet s3fs --upgrade",
                              "DASK_DISTRIBUTED__WORKERS__MEMORY__SPILL": "0.5",
                              "DASK_DISTRIBUTED__WORKERS__MEMORY__TARGET": "0.4",
                              "DASK_DISTRIBUTED__WORKERS__MEMORY__TERMINATE": "0.98"},
                          resources={"requests": {"memory": "4Gi"}, "limits": {"memory": "4Gi"}})


    client = Client(cluster)

    start = time.time()

    df = dd.read_parquet("s3://path",
                        engine="pyarrow",
                        ignore_metadata_file=False,
                        split_row_groups=True,
                        storage_options={
                             "key": <AWS public key>,
                             "secret": <AWS secret key>,
                             "token": <AWS token>
                        })

    
    def inc(i):
        return i+1
    

    
    df['tip_amount'] = df.tip_amount.map(inc)
    print(df.head())

    stop = time.time()
    print(round((stop - start),2))
    client.close()
    sleep(1)
    cluster.close()

Given your code this is to be expected. The only time you trigger any computation is when you call df.head(). And head only reads the first partition and so only one task for each operation happens (and only one worker needs to do anything). You can see this in the progress bars where there are only 1 of each task type.

If you did something else like print(df.tip_amount.mean().compute()) it would access the whole dataframe and you should see more cluster activity.

1 Like

Thanks a lot!
Now it makes more sense.