Dask job with map function gets stuck

Hello everyone,

I’m running Dask on Kubernetes deploying a cluster with 4 workers with 48G RAM availabity each.
I’m working with the nyc_taxi dataset (TLC Trip Record Data - TLC) and I’d like to apply a simple map function that increments ‘tip_amount’ by 1.
Whenever I run the application and take a look at the dask dashboard, I notice that the job is sort of done, yet it never completes.
Here is how it looks:

Below you’ll see the source code

import time
from time import sleep
import dask.dataframe as dd
from dask_sql import Context
from dask_kubernetes.operator import KubeCluster
from distributed import Client

if __name__ == '__main__':
    cluster = KubeCluster(name="test",
                          image='ghcr.io/dask/dask:latest',
                          n_workers=4,
                          env={"EXTRA_PIP_PACKAGES": "pyarrow fastparquet s3fs --upgrade"},
                          resources={"requests": {"memory": "48Gi"}, "limits": {"memory": "48Gi"}})

    client = Client(cluster)

    start = time.time()

    df = dd.read_parquet("~/taxi-parquet/",
                        engine="pyarrow",
                        ignore_metadata_file=False,
                        split_row_groups=True,
                        storage_options={
                            "key": <aws public key>,
                            "secret": <aws secret key>,
                            "token": <aws token>
                        })

    
    def inc(i):
        return i+1
    
    df = df.tip_amount.map(inc)
    df.compute()

    stop = time.time()

    print(round((stop - start),2))
    client.close()
    sleep(1)
    cluster.close()

The same code with pyspark didn’t give any problem, (even using less resources), instead with Dask this job gets stuck and eventually Killed (probably after exceeding the timeout limit), does anyone have any idea how this can be fixed?

Thanks in advance,
Leonardo

I assume your real code has "s3://taxi-parquet/" instead of ~/`?

The dashboard is saying that the computation is finished. However, you’re stuck trying to retrieve the output.
How large is the output you’re trying to retrieve? kilobytes? gigabytes? Do you have enough bandwidth to transfer it back to your desktop/laptop? The default for compute() is to transfer everything through the scheduler; if you’re fetching gigabytes it will kill off the scheduler.

Save the output to s3 with df.to_parquet instead of fetching it on your laptop.
Alternatively, make sure that you can connect directly from client to workers and then start the client with Client(direct_to_workers=True). This will bypass the scheduler.

1 Like

Thank you very much!

Yes in the real code the path is that one.

I figured the problem was the compute() function, I managed to fix it with persist(). Though thanks for the Client(direct_to_workers=True) hint.