How to pass credential to worker if it is changing every hour

I am facing a problem passing AWS credentials to workers.

I am using the complicated mechanism in client-side code, which requires authenticating with Kerberos, and It will provide me with the AWS credentials. When old AWS credentials expire, I have to re-authenticate with Kerberos again to get the credentials.

How should I pass AWS credentials from the parent script to the workers?

  1. I have to pass these credentials to workers when they get created
  2. Refersh these credentials to running workers

I have tried this code when a worker starts

def set_aws_credentials(aws_access_key_id, aws_secret_access_key, aws_session_token):
    import os
    os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
    os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
    os.environ["AWS_SESSION_TOKEN"] = aws_session_token
    print(f"AWS credentials :{os.getenv('AWS_ACCESS_KEY_ID')}")

aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
aws_session_token = os.getenv("AWS_SESSION_TOKEN")
self.client.register_worker_callbacks(set_aws_credentials(aws_access_key_id, aws_secret_access_key, aws_session_token))

I have also tried passing these values as environment variables.

credentials = {
    "AWS_ACCESS_KEY_ID": os.getenv("AWS_ACCESS_KEY_ID"),
    "AWS_SECRET_ACCESS_KEY": os.getenv("AWS_SECRET_ACCESS_KEY"),
    "AWS_SESSION_TOKEN": os.getenv("AWS_SESSION_TOKEN")
}
env_credentials = [{"name": key, "value": value} for key, value in credentials.items()]
worker_spec = make_worker_spec(n_workers=0,env=env_credentials)
worker_spec["spec"]["nodeSelector"] = {"dc_profiler": "true"}
self.cluster.add_worker_group(name=self.cluster.name, custom_spec=worker_spec, env=env_credentials)

When workers start, and I see the definition of the pod using Kubectl, I do not see the environment variable.

spec:
  containers:
  - args:
    - dask-worker
    - --name
    - $(DASK_WORKER_NAME)
    - --dashboard
    - --dashboard-address
    - "8788"
    env:
    - name: DASK_WORKER_NAME
      value: tablestats-lpak-default-worker-8b4a38f202
    - name: DASK_SCHEDULER_ADDRESS
      value: tcp://tablestats-lpak-scheduler.dc-dataload-vr-cdl-2.svc.cluster.local:8786
    image: ghcr.io/dask/dask:2024.1.0-py3.11
    imagePullPolicy: IfNotPresent
    name: worker

Can you please help me with what I am doing wrong?

Hi @rvarunrathod,

First: how are you starting your Dask cluster? I’m not sure updating environment variables after starting them would be enough…

Did you manage to get it working at least without refreshing them upon expiration?

There are some description on how to do it with EC2Cluster (Amazon Web Services (AWS) — Dask Cloud Provider 0+untagged.50.gef21317 documentation), but not sure you are using it.

Perhaps an alternative would be to use libraries such as fsspec, which would store the credentials for you. What are you doing then with these credentials?

cc @jacobtomlinson @martindurant

Updating environment variables will not have any effect on a process that is already running. There should be a way to update a file, though, kube has facilities for that (but don’t ask me the syntax). The typical file for this is ~/.aws/credentials ; but I’m not certain that a running session will re-check this file, even when getting a credentials expired error. You may need to restart your workers every time you update the credentials. If you do this in a rolling fashion, then maybe dask will be able to to copy partial results around so yo don’t lose work.

1 Like

Hi @martindurant, l am running job which is long and in that I fetch data from S3 and do computing on that. now as soon as computation on this data is finished I refresh credentials and again do the same. I am getting bad request error from client side. I think dask is not reusing new credentials.

File "/opt/conda/lib/python3.11/site-packages/s3fs/core.py", line 113, in _error_wrapper
  File "/opt/conda/lib/python3.11/site-packages/aiobotocore/client.py", line 411, in _make_api_call
botocore.exceptions.ClientError: An error occurred (400) when calling the HeadObject operation: Bad Request

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/profilers/table_stats/table_stats.py", line 73, in process_asset
    total_row_count = len(dask_df.index)
                      ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dask/dataframe/core.py", line 997, in __len__
    ).compute()
      ^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dask/base.py", line 379, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/dask/base.py", line 665, in compute
    results = schedule(dsk, keys, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/site-packages/dask/bytes/core.py", line 191, in read_block_from_file
  File "/opt/conda/lib/python3.11/site-packages/fsspec/core.py", line 103, in __enter__
  File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 1293, in open
  File "/opt/conda/lib/python3.11/site-packages/s3fs/core.py", line 685, in _open
  File "/opt/conda/lib/python3.11/site-packages/s3fs/core.py", line 2179, in __init__
  File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 1651, in __init__
  File "/opt/conda/lib/python3.11/site-packages/fsspec/spec.py", line 1664, in details
  File "/opt/conda/lib/python3.11/site-packages/fsspec/asyn.py", line 118, in wrapper
  File "/opt/conda/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
  File "/opt/conda/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
  File "/opt/conda/lib/python3.11/site-packages/s3fs/core.py", line 1371, in _info
  File "/opt/conda/lib/python3.11/site-packages/s3fs/core.py", line 362, in _call_s3
  File "/opt/conda/lib/python3.11/site-packages/s3fs/core.py", line 142, in _error_wrapper
OSError: [Errno 22] Bad Request

I have also provided “skip_instance_cache”: True this option in read_csv in storage_options.

I have also provided “skip_instance_cache”: True

I would have thought this does what you want - each instance will be created new and use whatever credentials are found at the time. You can turn on logger “s3fs” to see if this is happening or not.

Do we have documentation on how dask’s internal works. I want to know when I say dataframe.read_csv(“s3a://path/to/files/**”) how it will distribute this task to the worker.

The list of files to read is first determined in the local client, so you need to make sure that you can access there first. I realise now that in your traceback, I’m not sure if that’s happening on the client or in a worker.

Once that is done, the filesystem itself and path are serialised for each read task, and rehydrated on the workers, essentially passing around the same storage_options kwargs to every task. fsspec would normally pick from its internal cache of instances if one matched, but skip_instance_cache will prevent that.

Hi @martindurant , you are right. This error is from Dask workers, then how do we invalidate this s3fs instance for Dask workers?
Let me explain what I am doing in code.
When my job starts, I get AWS credentials, create a Dask Kubernetes cluster, and mount an AWS credentials file to worker pods. Then, I fetch data and process this data. after that, I delete Workergroup (not Daskcluster) refresh credentials, create new workergroup mount these new credentials to workers. again fetch data ad process this data.
but I think credentials are getting cached in task. Is there any way to refresh credentials?

At the time you update credentials, you might want to

client.run(s3fs.S3FileSystem.clear_instance_cache)

This might be enough.

1 Like