Running Dask on Kubernetes locally and distribute files among local workers

I have a simple application that reads log file from disk(in production from a shared storage preferably S3), does some aggregations and prints the results.

When I launch a local cluster

from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=4)

ddf = dd.read_json('./logs/2GB.log.txt', encoding='utf-8', blocksize=2000000)

print(ddf.reqUser.value_counts().nlargest(10).compute())

It works fine and gives the desired results.

Now I am trying to do a similar thing with Kubernetes:

from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client
client = Client("localhost:8786")

ddf = dd.read_json('./logs/2GB.log.txt', encoding='utf-8', blocksize=2000000)
print(ddf.reqUser.value_counts().nlargest(10).compute())

This reports a FileNotFoundError. Which is correct, the path does not exists on any worker pods. To fix this I tried to mount the log directory on Pods with a PVC and changed the file path. However, I am still getting the same error.

My question is: what is the correct way of reading files and distribute among workers locally on Kubernetes. On an EKS cluster ideally, I’ll be reading a file from S3, therefore as long as I have correct permissions this should not be an issue on AWS. But how to replicate same thing locally? A tutorial or a working example of how people use kubernetes would be great.

Hi @matrixbegins,

So if I understand correctly, before moving into AWS and using S3 storage, you want to be able to try things out in an on prem or even on a one node Kubernetes instance? Well, if this is the case, this is really more a Kubernetes question than a Dask one.

I’ve come across a nice Stackoverflow answer: docker - How to share storage between Kubernetes pods? - Stack Overflow. I guess you should need to run an NFS server and mount it as a PVC, but this is probably tricky to do correctly. Else you could also try to start your own S3 enabled storage with something like MinIO.

I have already setup this. Here is my cluster definition:

# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
  name: dask-test-local
spec:
  worker:
    replicas: 2
    spec:
      containers:
      - name: worker
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args:
          - dask-worker
          - --name
          - $(DASK_WORKER_NAME)
        volumeMounts:
            - mountPath: /opt/ranger/logs
              name: logs-pv-volume
      volumes:
        - name: logs-pv-volume
          persistentVolumeClaim:
            claimName: logs-pv-claim

  scheduler:
    spec:
      containers:
      - name: scheduler
        image: "ghcr.io/dask/dask:latest"
        imagePullPolicy: "IfNotPresent"
        args:
          - dask-scheduler
        ports:
          - name: tcp-comm
            containerPort: 8786
            protocol: TCP
          - name: http-dashboard
            containerPort: 8787
            protocol: TCP
        readinessProbe:
          httpGet:
            port: http-dashboard
            path: /health
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            port: http-dashboard
            path: /health
          initialDelaySeconds: 15
          periodSeconds: 20
    service:
      type: NodePort
      selector:
        dask.org/cluster-name: dask-test-local
        dask.org/component: scheduler
      ports:
      - name: tcp-comm
        protocol: TCP
        port: 8786
        targetPort: "tcp-comm"
      - name: http-dashboard
        protocol: TCP
        port: 8787
        targetPort: "http-dashboard"

pvc.yml

kind: PersistentVolume
apiVersion: v1
metadata:
  name: logs-pv-volume
  labels:
    type: local
    app: log_dir
spec:
  storageClassName: manual
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteMany
  hostPath:
    path: "/Users/ap/Documents/projects/private/softwares/python-demo/logs"

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: logs-pv-claim
  labels:
    app: log_dir
spec:
  storageClassName: manual
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 10Gi


This is actually taken directly from the example on dask distributed docs. Custom Resources — Dask Kubernetes 2021.03.0+170.g56600f7 documentation
I have only added the volume section. And I think its not really a kubernetes question. I am facing a no such file or directory issue. However when I SSH on a pod, I can see correct files under /opt/ranger/logs.

from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client

client = Client("localhost:8786")
mdf = dd.read_json('/opt/ranger/logs/200MB_*ranger.log.txt', encoding='utf-8', blocksize=4000000)

print(ddf.reqUser.value_counts().nlargest(10).compute())

When I run this following is the error that I get:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
File /opt/homebrew/lib/python3.10/site-packages/dask/backends.py:133, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    132 try:
--> 133     return func(*args, **kwargs)
    134 except Exception as e:

File /opt/homebrew/lib/python3.10/site-packages/dask/dataframe/io/json.py:213, in read_json(url_path, orient, lines, storage_options, blocksize, sample, encoding, errors, compression, meta, engine, include_path_column, path_converter, **kwargs)
    212 if blocksize:
--> 213     b_out = read_bytes(
    214         url_path,
    215         b"\n",
    216         blocksize=blocksize,
    217         sample=sample,
    218         compression=compression,
    219         include_path=include_path_column,
    220         **storage_options,
    221     )
    222     if include_path_column:

File /opt/homebrew/lib/python3.10/site-packages/dask/bytes/core.py:84, in read_bytes(urlpath, delimiter, not_zero, blocksize, sample, compression, include_path, **kwargs)
     83 if len(paths) == 0:
---> 84     raise OSError("%s resolved to no files" % urlpath)
     86 if blocksize is not None:

OSError: /opt/ranger/logs/200MB_*ranger.log.txt resolved to no files

The above exception was the direct cause of the following exception:

OSError                                   Traceback (most recent call last)
Cell In[14], line 3
      1 # mdf = dd.read_json('./logs/200MB_*ranger.log.txt', encoding='utf-8', blocksize=4000000)
----> 3 mdf = dd.read_json('/opt/ranger/logs/200MB_*ranger.log.txt', encoding='utf-8', blocksize=4000000)
      5 print(mdf.reqUser.value_counts().nlargest(10).compute())

File /opt/homebrew/lib/python3.10/site-packages/dask/backends.py:135, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    133     return func(*args, **kwargs)
    134 except Exception as e:
--> 135     raise type(e)(
    136         f"An error occurred while calling the {funcname(func)} "
    137         f"method registered to the {self.backend} backend.\n"
    138         f"Original Message: {e}"
    139     ) from e

OSError: An error occurred while calling the read_json method registered to the pandas backend.
Original Message: /opt/ranger/logs/200MB_*ranger.log.txt resolved to no files

From the error its clear that code is trying to read files from host, not from pod’s path.
Therefore my question is what’s the correct way of reading and distributing files on multiple workers.

Another version that I can try is as follows:

import dask, time, datetime

import dask.dataframe as dd

from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client

client = Client("localhost:8786")

def get_top_k_users_delayed():
    mdf = dd.read_json('/opt/ranger/logs/200MB_1*_ranger.log.txt', encoding='utf-8', blocksize=4000000)
    return mdf.reqUser.value_counts().nlargest(10).compute()
    

st = time.time()
new_df_future = client.submit(get_top_k_users_delayed)

print(new_df_future.result())
et = time.time()
print('Execution time:', (et - st), 's')

output:

hbase      1127737
hive       1127387
impala     1125742
iceberg    1124734
Name: reqUser, dtype: int64
Execution time: 191.05175614356995 s

Now this code runs properly. But I can’t see any progress in the status dashboard. Not sure where its running. Maybe its running in the host without any parallelisation. because I cannot see any utilization info for the workers. The dashboard is just blank.

This is the correct way, but in order to correctly distribute work (infer DF metadata, compute partitions and tasks), the Client/Scheduler probably also need to actually see the file. From were are you executing Client code? Not from a Kubernetes Pod?

Yes probably. This is only a workaround (it would be better to just be able to work with Dask DataFrame), but you can try what’s in Launch Tasks from Tasks — Dask.distributed 2023.11.0 documentation in order to submit tasks to the Cluster from other tasks.

This is not the correct way around, simply because submitting tasks like this only uses one worker. If the code really uses parallelism or multiple workers it takes less than 20 secs to compute on my local machine. I’ll setup an EKS cluster and use S3 for my work. Setting up locally will have to wait. Many thanks for the insight. I am new to dask and big data world. Can’t thank you enough for being so patient and answering my questions.

Did you try this using the get_client function?

@guillaumeeb
I dropped the idea of setting up locally and focusing on deploying this on AWS EKS Cluster.

1 Like