Hello folks,
I’m running Dask on K8s, in this case I have 7 workers with 4G RAM availability each.
Taking a look at the dashboard I notice that one of them seems to carry all the job. I was expecting the workload to be split among all the workers but this doesn’t seem to happen.
Does anyone know the reason behind this behaviour?
Thanks in advance,
Leonardo
Below you’ll see the source code.
import time
import os
import dask.config
from time import sleep
import dask.dataframe as dd
from dask_sql import Context
from dask_kubernetes.operator import KubeCluster
from distributed import Client
if __name__ == '__main__':
cluster = KubeCluster(name="test",
image='ghcr.io/dask/dask:latest',
n_workers=7,
env={"EXTRA_PIP_PACKAGES": "pyarrow fastparquet s3fs --upgrade",
"DASK_DISTRIBUTED__WORKERS__MEMORY__SPILL": "0.5",
"DASK_DISTRIBUTED__WORKERS__MEMORY__TARGET": "0.4",
"DASK_DISTRIBUTED__WORKERS__MEMORY__TERMINATE": "0.98"},
resources={"requests": {"memory": "4Gi"}, "limits": {"memory": "4Gi"}})
client = Client(cluster)
start = time.time()
df = dd.read_parquet("s3://path",
engine="pyarrow",
ignore_metadata_file=False,
split_row_groups=True,
storage_options={
"key": <AWS public key>,
"secret": <AWS secret key>,
"token": <AWS token>
})
def inc(i):
return i+1
df['tip_amount'] = df.tip_amount.map(inc)
print(df.head())
stop = time.time()
print(round((stop - start),2))
client.close()
sleep(1)
cluster.close()