Worker pods exist but client cannot connect to them or workers do not accept jobs

I have been trying to configure “Dask-Gateway” for a few days but am still not able to run any analysis from the client side in my trials. I have tried so many configurations, everything seems to be working fine, but jobs are not submitted to the worker pods and client object is created without any resources, so I am completely stuck now. I will post here only one of the configurations I have tried and I would appreciate if anyone can spot any issue regarding my setup.

Output of some k8s commands:

(reana) alputer@alputer:~/REANA/REANA/reana/helm/reana$ k get pods
NAME                                                 READY   STATUS    RESTARTS   AGE
api-dask-gateway-546796f64f-zv6d2                    1/1     Running   0          52m
controller-dask-gateway-6b8f5f5d85-zv9p2             1/1     Running   0          52m
dask-scheduler-a32d07875efc4e60ae3dbaf45c223092      1/1     Running   0          7m24s
dask-worker-a32d07875efc4e60ae3dbaf45c223092-4rtpz   1/1     Running   0          7m16s
dask-worker-a32d07875efc4e60ae3dbaf45c223092-pd6sp   1/1     Running   0          7m16s
traefik-dask-gateway-54dcb8458f-ntnzc                1/1     Running   0          52m

CLIENT CODE:

# 1) Authenticate agains dask gateway
# 2) Get a personal cluster
# 3) Ask for a worker node
# 4) Submit the analysis


from dask_gateway import Gateway
from dask_gateway.auth import BasicAuth

gateway = Gateway(
    address="http://194.12.178.4:30081",
    auth=BasicAuth(username="alputer", password="dummy"),
)
gateway.list_clusters()

cluster = gateway.new_cluster()
cluster.scale(2)

client = cluster.get_client()

import dask.array as da

a = da.random.normal(size=(1000, 1000), chunks=(500, 500))

# Code goes into infinite loop here since jobs are not sent to worker pods
a.mean().compute()

OUTPUT OF THE CODE:

(reana) alputer@alputer:~/REANA/REANA/reana-dask-controller$ python client3.py
/home/alputer/.virtualenvs/reana/lib/python3.8/site-packages/distributed/client.py:1388: VersionMismatchWarning: Mismatched versions found

+-------------+----------------+----------------+---------+
| Package     | Client         | Scheduler      | Workers |
+-------------+----------------+----------------+---------+
| dask        | 2023.5.0       | 2024.1.0       | None    |
| distributed | 2023.5.0       | 2024.1.0       | None    |
| msgpack     | 1.0.8          | 1.0.7          | None    |
| python      | 3.8.12.final.0 | 3.11.7.final.0 | None    |
| toolz       | 0.12.1         | 0.12.0         | None    |
| tornado     | 6.4.1          | 6.4            | None    |
+-------------+----------------+----------------+---------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
<Client: 'tls://10.244.0.127:8786' processes=0 threads=0, memory=0 B>

My Helm Chart

## Provide a name to partially substitute for the full names of resources (will maintain the release name)
##
nameOverride: ""

## Provide a name to substitute for the full names of resources
##
fullnameOverride: ""

# gateway nested config relates to the api Pod and the dask-gateway-server
# running within it, the k8s Service exposing it, as well as the schedulers
# (gateway.backend.scheduler) and workers gateway.backend.worker) created by the
# controller when a DaskCluster k8s resource is registered.
gateway:
  # Number of instances of the gateway-server to run
  replicas: 1

  # Annotations to apply to the gateway-server pods.
  annotations: {}

  # Resource requests/limits for the gateway-server pod.
  resources:
    requests:
      memory: "256M"
      cpu: 0.5
    limits:
      memory: "1G"
      cpu: 1

  # Path prefix to serve dask-gateway api requests under
  # This prefix will be added to all routes the gateway manages
  # in the traefik proxy.
  prefix: /

  # The gateway server log level
  loglevel: DEBUG

  # The image to use for the dask-gateway-server pod (api pod)
  image:
    name: ghcr.io/dask/dask-gateway-server
    tag: "2024.1.0"
    pullPolicy: IfNotPresent

  # Add additional environment variables to the gateway pod
  # e.g.
  # env:
  # - name: MYENV
  #   value: "my value"
  env: []

  # Image pull secrets for gateway-server pod
  imagePullSecrets: []

  # Configuration for the gateway-server service
  service:
    annotations: {}

  auth:
    # The auth type to use. One of {simple, kerberos, jupyterhub, custom}.
    type: simple

    simple:
      # A shared password to use for all users.
      password: "dummy"

    kerberos:
      # Path to the HTTP keytab for this node.
      keytab:

    jupyterhub:
      # A JupyterHub api token for dask-gateway to use. See
      # https://gateway.dask.org/install-kube.html#authenticating-with-jupyterhub.
      apiToken:

      # The JupyterHub Helm chart will automatically generate a token for a
      # registered service. If you don't specify an apiToken explicitly as
      # required in dask-gateway version <=2022.6.1, the dask-gateway Helm chart
      # will try to look for a token from a k8s Secret created by the JupyterHub
      # Helm chart in the same namespace. A failure to find this k8s Secret and
      # key will cause a MountFailure for when the api-dask-gateway pod is
      # starting.
      apiTokenFromSecretName: hub
      apiTokenFromSecretKey: hub.services.dask-gateway.apiToken

      # JupyterHub's api url. Inferred from JupyterHub's service name if running
      # in the same namespace.
      apiUrl:

    custom:
      # The full authenticator class name.
      class:

      # Configuration fields to set on the authenticator class.
      config: {}

  livenessProbe:
    # Enables the livenessProbe.
    enabled: true
    # Configures the livenessProbe.
    initialDelaySeconds: 5
    timeoutSeconds: 2
    periodSeconds: 10
    failureThreshold: 6
  readinessProbe:
    # Enables the readinessProbe.
    enabled: true
    # Configures the readinessProbe.
    initialDelaySeconds: 5
    timeoutSeconds: 2
    periodSeconds: 10
    failureThreshold: 3

  # nodeSelector, affinity, and tolerations the for the `api` pod running dask-gateway-server
  nodeSelector: {}
  affinity: {}
  tolerations: []

  # Any extra configuration code to append to the generated `dask_gateway_config.py`
  # file. Can be either a single code-block, or a map of key -> code-block
  # (code-blocks are run in alphabetical order by key, the key value itself is
  # meaningless). The map version is useful as it supports merging multiple
  # `values.yaml` files, but is unnecessary in other cases.
  extraConfig: {}

  # backend nested configuration relates to the scheduler and worker resources
  # created for DaskCluster k8s resources by the controller.
  backend:
    # The image to use for both schedulers and workers.
    image:
      name: ghcr.io/dask/dask-gateway
      tag: "2024.1.0"
      pullPolicy: IfNotPresent

    # Image pull secrets for a dask cluster's scheduler and worker pods
    imagePullSecrets: []

    # The namespace to launch dask clusters in. If not specified, defaults to
    # the same namespace the gateway is running in.
    namespace: default

    # A mapping of environment variables to set for both schedulers and workers.
    environment: {}

    scheduler:
      # Any extra configuration for the scheduler pod. Sets
      # `c.KubeClusterConfig.scheduler_extra_pod_config`.
      extraPodConfig: {}

      # Any extra configuration for the scheduler container.
      # Sets `c.KubeClusterConfig.scheduler_extra_container_config`.
      extraContainerConfig: {}

      # Cores request/limit for the scheduler.
      cores:
        request: 0.5
        limit: 1

      # Memory request/limit for the scheduler.
      memory:
        request: "500M"
        limit: "1G"

    worker:
      # Any extra configuration for the worker pod. Sets
      # `c.KubeClusterConfig.worker_extra_pod_config`.
      extraPodConfig: {}

      # Any extra configuration for the worker container. Sets
      # `c.KubeClusterConfig.worker_extra_container_config`.
      extraContainerConfig: {}

      # Cores request/limit for each worker.
      cores:
        request: 0.5
        limit: 1

      # Memory request/limit for each worker.
      memory:
        request: "250M"
        limit: "500M"

      # Number of threads available for a worker. Sets
      # `c.KubeClusterConfig.worker_threads`
      threads: 2


# controller nested config relates to the controller Pod and the
# dask-gateway-server running within it that makes things happen when changes to
# DaskCluster k8s resources are observed.
controller:
  # Whether the controller should be deployed. Disabling the controller allows
  # running it locally for development/debugging purposes.
  enabled: true

  # Any annotations to add to the controller pod
  annotations: {}

  # Resource requests/limits for the controller pod
  resources:
    requests:
      memory: "256M"
      cpu: 0.5
    limits:
      memory: "1G"
      cpu: 1

  # Image pull secrets for controller pod
  imagePullSecrets: []

  # The controller log level
  loglevel: DEBUG

  # Max time (in seconds) to keep around records of completed clusters.
  # Default is 24 hours.
  completedClusterMaxAge: 86400

  # Time (in seconds) between cleanup tasks removing records of completed
  # clusters. Default is 5 minutes.
  completedClusterCleanupPeriod: 600

  # Base delay (in seconds) for backoff when retrying after failures.
  backoffBaseDelay: 0.1

  # Max delay (in seconds) for backoff when retrying after failures.
  backoffMaxDelay: 300

  # Limit on the average number of k8s api calls per second.
  k8sApiRateLimit: 50

  # Limit on the maximum number of k8s api calls per second.
  k8sApiRateLimitBurst: 100

  # The image to use for the controller pod.
  image:
    name: ghcr.io/dask/dask-gateway-server
    tag: "2024.1.0"
    pullPolicy: IfNotPresent

  # Settings for nodeSelector, affinity, and tolerations for the controller pods
  nodeSelector: {}
  affinity: {}
  tolerations: []



# traefik nested config relates to the traefik Pod and Traefik running within it
# that is acting as a proxy for traffic towards the gateway or user created
# DaskCluster resources.
traefik:
  # Number of instances of the proxy to run
  replicas: 1

  # Any annotations to add to the proxy pods
  annotations: {}

  # Resource requests/limits for the proxy pods
  resources:
    requests:
      memory: "256M"
      cpu: 0.5
    limits:
      memory: "1G"
      cpu: 1

  # The image to use for the proxy pod
  image:
    name: traefik
    tag: "2.10.6"
    pullPolicy: IfNotPresent
  imagePullSecrets: []

  # Any additional arguments to forward to traefik
  additionalArguments: []

  # The proxy log level
  loglevel: DEBUG

  # Whether to expose the dashboard on port 9000 (enable for debugging only!)
  dashboard: true

  # Additional configuration for the traefik service
  service:
    type: LoadBalancer
    annotations: {}
    spec: {}
    ports:
      web:
        # The port HTTP(s) requests will be served on
        port: 80
        nodePort: 30081
      tcp:
        # The port TCP requests will be served on. Set to `web` to share the
        # web service port
        port: web
        nodePort: 30081

  # Settings for nodeSelector, affinity, and tolerations for the traefik pods
  nodeSelector: {}
  affinity: {}
  tolerations: []



# rbac nested configuration relates to the choice of creating or replacing
# resources like (Cluster)Role, (Cluster)RoleBinding, and ServiceAccount.
rbac:
  # Whether to enable RBAC.
  enabled: true

  # Existing names to use if ClusterRoles, ClusterRoleBindings, and
  # ServiceAccounts have already been created by other means (leave set to
  # `null` to create all required roles at install time)
  controller:
    serviceAccountName:

  gateway:
    serviceAccountName:

  traefik:
    serviceAccountName:



# global nested configuration is accessible by all Helm charts that may depend
# on each other, but not used by this Helm chart. An entry is created here to
# validate its use and catch YAML typos via this configurations associated JSON
# schema.
global: {}

Hi @Alputer, welcome to Dask community!

Just to be sure, where do you print the client object? I don’t see it in your code. It can take time for the client to be aware of its resources.

Is your code stuck on the compute() line? Does it end somehow?

Another thing, from where are you running the main Python script? From outside the K8S cluster?

Yes, I am running the python script outside the k8s cluster. It is stuck on the compute() function and it does not end, goes into an infinite loop. I made a copy-paste and somehow deleted the print statement. My code looks like this:

from dask_gateway import Gateway
from dask_gateway.auth import BasicAuth

gateway = Gateway(
    address="http://194.12.178.4:30081",
    auth=BasicAuth(username="alputer", password="dummy"),
)
gateway.list_clusters()

cluster = gateway.new_cluster()
cluster.scale(2)


client = cluster.get_client()
print(client)

import dask.array as da

a = da.random.normal(size=(1000, 1000), chunks=(500, 500))

a.mean().compute()

By the way, I tried to troubleshoot it a bit more today and we have the following behaviour which might be the reason why scheduler and worker pods cannot connect/talk to each other. Scheduler’s pod name and the name in the connection log in worker node differ from each other. Pod name has scheduler word in the middle, but the pod that the worker pod is trying to connect is missing scheduler word in the middle.

With dask gateway: (After installing the dask-gateway chart and running the code from client)

$ kubectl logs dask-worker-81646b2027f146fc9f3f3db885c90482-qz9ms
...
2024-06-21 09:59:14,342 - distributed.worker - INFO -         Registered to: tls://dask-81646b2027f146fc9f3f3db885c90482.default:8786
2024-06-21 09:59:14,343 - distributed.worker - INFO - -------------------------------------------------
2024-06-21 09:59:14,343 - distributed.core - INFO - Starting established connection to tls://dask-81646b2027f146fc9f3f3db885c90482.default:8786
$ kubectl get pods
NAME                                        READY   STATUS    RESTARTS   AGE
...
dask-scheduler-81646b2027f146fc9f3f3db885c90482  1/1     Running   0          3m

I also add the classical dask chart deployment’s result here below. When we install dask without dask-gateway and look at the logs of the worker pods, we see this output.
With dask classic: (After installing dask chart)

$ kubectl logs dask-worker-6cf74bbb49-57jhj
...
2024-06-21 09:08:29,764 - distributed.worker - INFO - -------------------------------------------------
2024-06-21 09:08:30,461 - distributed.worker - INFO -         Registered to:  tcp://dask-scheduler:8786
2024-06-21 09:08:30,462 - distributed.worker - INFO - -------------------------------------------------
2024-06-21 09:08:30,462 - distributed.core - INFO - Starting established connection to tcp://dask-scheduler:8786```

Looking at these logs, it looks like the Worker are able to connect to the Scheduler in both cases.

What do you have in the Scheduler logs, does it receive the computation and associated tasks?

Scheduler logs look like this:

alputer@alputer:~$ k logs dask-worker-1749472329454cea847e023773a644fa-fwshn 
/home/dask/.local/lib/python3.11/site-packages/distributed/cli/dask_worker.py:266: FutureWarning: dask-worker is deprecated and will be removed in a future release; use `dask worker` instead
  warnings.warn(
2024-06-23 14:56:39,603 - distributed.nanny - INFO -         Start Nanny at: 'tls://10.244.0.21:34971'
2024-06-23 14:56:40,490 - distributed.worker - INFO -       Start worker at:    tls://10.244.0.21:34101
2024-06-23 14:56:40,490 - distributed.worker - INFO -          Listening to:    tls://10.244.0.21:34101
2024-06-23 14:56:40,490 - distributed.worker - INFO -           Worker name: dask-worker-1749472329454cea847e023773a644fa-fwshn
2024-06-23 14:56:40,490 - distributed.worker - INFO -          dashboard at:           10.244.0.21:8787
2024-06-23 14:56:40,490 - distributed.worker - INFO - Waiting to connect to: tls://dask-1749472329454cea847e023773a644fa.default:8786
2024-06-23 14:56:40,490 - distributed.worker - INFO - -------------------------------------------------
2024-06-23 14:56:40,490 - distributed.worker - INFO -               Threads:                          2
2024-06-23 14:56:40,490 - distributed.worker - INFO -                Memory:                   2.00 GiB
2024-06-23 14:56:40,490 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-f_4jx7o0
2024-06-23 14:56:40,490 - distributed.worker - INFO - -------------------------------------------------
2024-06-23 14:56:40,629 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-06-23 14:56:40,629 - distributed.worker - INFO -         Registered to: tls://dask-1749472329454cea847e023773a644fa.default:8786
2024-06-23 14:56:40,629 - distributed.worker - INFO - -------------------------------------------------
2024-06-23 14:56:40,629 - distributed.core - INFO - Starting established connection to tls://dask-1749472329454cea847e023773a644fa.default:8786

I believe these are the worker log. Could you get the Scheduler logs? In any case, Worker seems to correctly connect to a Scheduler.

Do you have anything like Istio installed on your cluster?