I have already setup this. Here is my cluster definition:
# cluster.yaml
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: dask-test-local
spec:
worker:
replicas: 2
spec:
containers:
- name: worker
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- --name
- $(DASK_WORKER_NAME)
volumeMounts:
- mountPath: /opt/ranger/logs
name: logs-pv-volume
volumes:
- name: logs-pv-volume
persistentVolumeClaim:
claimName: logs-pv-claim
scheduler:
spec:
containers:
- name: scheduler
image: "ghcr.io/dask/dask:latest"
imagePullPolicy: "IfNotPresent"
args:
- dask-scheduler
ports:
- name: tcp-comm
containerPort: 8786
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
readinessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 5
periodSeconds: 10
livenessProbe:
httpGet:
port: http-dashboard
path: /health
initialDelaySeconds: 15
periodSeconds: 20
service:
type: NodePort
selector:
dask.org/cluster-name: dask-test-local
dask.org/component: scheduler
ports:
- name: tcp-comm
protocol: TCP
port: 8786
targetPort: "tcp-comm"
- name: http-dashboard
protocol: TCP
port: 8787
targetPort: "http-dashboard"
pvc.yml
kind: PersistentVolume
apiVersion: v1
metadata:
name: logs-pv-volume
labels:
type: local
app: log_dir
spec:
storageClassName: manual
capacity:
storage: 10Gi
accessModes:
- ReadWriteMany
hostPath:
path: "/Users/ap/Documents/projects/private/softwares/python-demo/logs"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: logs-pv-claim
labels:
app: log_dir
spec:
storageClassName: manual
accessModes:
- ReadWriteMany
resources:
requests:
storage: 10Gi
This is actually taken directly from the example on dask distributed docs. Custom Resources — Dask Kubernetes 2021.03.0+170.g56600f7 documentation
I have only added the volume section. And I think its not really a kubernetes question. I am facing a no such file or directory issue. However when I SSH on a pod, I can see correct files under /opt/ranger/logs
.
from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client
client = Client("localhost:8786")
mdf = dd.read_json('/opt/ranger/logs/200MB_*ranger.log.txt', encoding='utf-8', blocksize=4000000)
print(ddf.reqUser.value_counts().nlargest(10).compute())
When I run this following is the error that I get:
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
File /opt/homebrew/lib/python3.10/site-packages/dask/backends.py:133, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
132 try:
--> 133 return func(*args, **kwargs)
134 except Exception as e:
File /opt/homebrew/lib/python3.10/site-packages/dask/dataframe/io/json.py:213, in read_json(url_path, orient, lines, storage_options, blocksize, sample, encoding, errors, compression, meta, engine, include_path_column, path_converter, **kwargs)
212 if blocksize:
--> 213 b_out = read_bytes(
214 url_path,
215 b"\n",
216 blocksize=blocksize,
217 sample=sample,
218 compression=compression,
219 include_path=include_path_column,
220 **storage_options,
221 )
222 if include_path_column:
File /opt/homebrew/lib/python3.10/site-packages/dask/bytes/core.py:84, in read_bytes(urlpath, delimiter, not_zero, blocksize, sample, compression, include_path, **kwargs)
83 if len(paths) == 0:
---> 84 raise OSError("%s resolved to no files" % urlpath)
86 if blocksize is not None:
OSError: /opt/ranger/logs/200MB_*ranger.log.txt resolved to no files
The above exception was the direct cause of the following exception:
OSError Traceback (most recent call last)
Cell In[14], line 3
1 # mdf = dd.read_json('./logs/200MB_*ranger.log.txt', encoding='utf-8', blocksize=4000000)
----> 3 mdf = dd.read_json('/opt/ranger/logs/200MB_*ranger.log.txt', encoding='utf-8', blocksize=4000000)
5 print(mdf.reqUser.value_counts().nlargest(10).compute())
File /opt/homebrew/lib/python3.10/site-packages/dask/backends.py:135, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
133 return func(*args, **kwargs)
134 except Exception as e:
--> 135 raise type(e)(
136 f"An error occurred while calling the {funcname(func)} "
137 f"method registered to the {self.backend} backend.\n"
138 f"Original Message: {e}"
139 ) from e
OSError: An error occurred while calling the read_json method registered to the pandas backend.
Original Message: /opt/ranger/logs/200MB_*ranger.log.txt resolved to no files
From the error its clear that code is trying to read files from host, not from pod’s path.
Therefore my question is what’s the correct way of reading and distributing files on multiple workers.