Hello everyone,
I’m running Dask on Kubernetes deploying a cluster with 4 workers with 48G RAM availabity each.
I’m working with the nyc_taxi dataset (TLC Trip Record Data - TLC) and I’d like to apply a simple map function that increments ‘tip_amount’ by 1.
Whenever I run the application and take a look at the dask dashboard, I notice that the job is sort of done, yet it never completes.
Here is how it looks:
Below you’ll see the source code
import time
from time import sleep
import dask.dataframe as dd
from dask_sql import Context
from dask_kubernetes.operator import KubeCluster
from distributed import Client
if __name__ == '__main__':
cluster = KubeCluster(name="test",
image='ghcr.io/dask/dask:latest',
n_workers=4,
env={"EXTRA_PIP_PACKAGES": "pyarrow fastparquet s3fs --upgrade"},
resources={"requests": {"memory": "48Gi"}, "limits": {"memory": "48Gi"}})
client = Client(cluster)
start = time.time()
df = dd.read_parquet("~/taxi-parquet/",
engine="pyarrow",
ignore_metadata_file=False,
split_row_groups=True,
storage_options={
"key": <aws public key>,
"secret": <aws secret key>,
"token": <aws token>
})
def inc(i):
return i+1
df = df.tip_amount.map(inc)
df.compute()
stop = time.time()
print(round((stop - start),2))
client.close()
sleep(1)
cluster.close()
The same code with pyspark didn’t give any problem, (even using less resources), instead with Dask this job gets stuck and eventually Killed (probably after exceeding the timeout limit), does anyone have any idea how this can be fixed?
Thanks in advance,
Leonardo