当Python脚本无限运行时,如何刷新boto3凭证

13
我正在尝试编写一个使用watchdog的Python脚本,用于查找文件创建并使用boto3将其上传到S3。然而,我的boto3凭证每12小时过期一次,所以我需要更新它们。我将我的boto3凭证存储在~/.aws/credentials中。所以现在我正在尝试捕获S3UploadFailedError,更新凭证,并将它们写入~/.aws/credentials。但是尽管凭证已经更新,我再次调用boto3.client('s3')时仍然会抛出异常。
我做错了什么?或者我该如何解决这个问题?
以下是代码片段:
try:
     s3 = boto3.client('s3')
     s3.upload_file(event.src_path,'bucket-name',event.src_path)

except boto3.exceptions.S3UploadFailedError as e:
     print(e)
     get_aws_credentials()
     s3 = boto3.client('s3')

5个回答

16
我找到了一个很好的例子来刷新这个链接中的凭证: https://pritul95.github.io/blogs/boto3/2020/08/01/refreshable-boto3-session/ 但是里面有一个小bug。请注意。这是修正后的代码:
from uuid import uuid4
from datetime import datetime
from time import time

import pytz
from boto3 import Session
from botocore.credentials import RefreshableCredentials
from botocore.session import get_session


class RefreshableBotoSession:
    """
    Boto Helper class which lets us create a refreshable session so that we can cache the client or resource.

    Usage
    -----
    session = RefreshableBotoSession().refreshable_session()

    client = session.client("s3") # we now can cache this client object without worrying about expiring credentials
    """

    def __init__(
        self,
        region_name: str = None,
        profile_name: str = None,
        sts_arn: str = None,
        session_name: str = None,
        session_ttl: int = 3000
    ):
        """
        Initialize `RefreshableBotoSession`

        Parameters
        ----------
        region_name : str (optional)
            Default region when creating a new connection.

        profile_name : str (optional)
            The name of a profile to use.

        sts_arn : str (optional)
            The role arn to sts before creating a session.

        session_name : str (optional)
            An identifier for the assumed role session. (required when `sts_arn` is given)

        session_ttl : int (optional)
            An integer number to set the TTL for each session. Beyond this session, it will renew the token.
            50 minutes by default which is before the default role expiration of 1 hour
        """

        self.region_name = region_name
        self.profile_name = profile_name
        self.sts_arn = sts_arn
        self.session_name = session_name or uuid4().hex
        self.session_ttl = session_ttl

    def __get_session_credentials(self):
        """
        Get session credentials
        """
        session = Session(region_name=self.region_name, profile_name=self.profile_name)

        # if sts_arn is given, get credential by assuming the given role
        if self.sts_arn:
            sts_client = session.client(service_name="sts", region_name=self.region_name)
            response = sts_client.assume_role(
                RoleArn=self.sts_arn,
                RoleSessionName=self.session_name,
                DurationSeconds=self.session_ttl,
            ).get("Credentials")

            credentials = {
                "access_key": response.get("AccessKeyId"),
                "secret_key": response.get("SecretAccessKey"),
                "token": response.get("SessionToken"),
                "expiry_time": response.get("Expiration").isoformat(),
            }
        else:
            session_credentials = session.get_credentials().get_frozen_credentials()
            credentials = {
                "access_key": session_credentials.access_key,
                "secret_key": session_credentials.secret_key,
                "token": session_credentials.token,
                "expiry_time": datetime.fromtimestamp(time() + self.session_ttl).replace(tzinfo=pytz.utc).isoformat(),
            }

        return credentials

    def refreshable_session(self) -> Session:
        """
        Get refreshable boto3 session.
        """
        # Get refreshable credentials
        refreshable_credentials = RefreshableCredentials.create_from_metadata(
            metadata=self.__get_session_credentials(),
            refresh_using=self.__get_session_credentials,
            method="sts-assume-role",
        )

        # attach refreshable credentials current session
        session = get_session()
        session._credentials = refreshable_credentials
        session.set_config_variable("region", self.region_name)
        autorefresh_session = Session(botocore_session=session)

        return autorefresh_session

请注意,您需要安装pytz

太棒了,正是我所需要的,谢谢。 - tdebroc
2
如果有人出现“无法减去偏移量敏感和偏移量感知日期时间”的问题,则导入pytz并将"expiry_time": datetime.fromtimestamp(time() + self.session_ttl).isoformat()修改为"expiry_time": datetime.fromtimestamp(time() + self.session_ttl).replace(tzinfo=pytz.utc).isoformat()。 - Kulan Sachinthana

2
根据文档,客户端在几个位置查找凭证,并且还有其他更适合编程的选项可供考虑,而不是使用.aws/credentials文件。
引用文档:
搜索凭证的Boto3顺序如下:
  • 在boto.client()方法中作为参数传递凭证
  • 在创建Session对象时作为参数传递凭证
  • 环境变量
  • 共享凭证文件(~/.aws/credentials)
  • AWS配置文件(~/.aws/config)
  • 扮演角色提供者
在您的情况下,由于您已经捕获了异常并更新了凭证,因此我建议将新凭证传递给客户端的新实例,如下所示:
client = boto3.client(
    's3',
    aws_access_key_id=NEW_ACCESS_KEY,
    aws_secret_access_key=NEW_SECRET_KEY,
    aws_session_token=NEW_SESSION_TOKEN
)

如果您在代码中的其他地方使用相同的凭证来创建其他客户端,我建议将它们设置为环境变量:
import os

os.environ['AWS_ACCESS_KEY_ID'] = NEW_ACCESS_KEY
os.environ['AWS_SECRET_ACCESS_KEY'] = NEW_SECRET_KEY
os.environ['AWS_SESSION_TOKEN'] = NEW_SESSION_TOKEN

再次引用文档:

当您使用临时凭证时,您的AWS账户的会话密钥是必需的。


2
  1. 创建boto3会话
  2. 将其botocore凭据替换为DeferredRefreshableCredentials
from botocore.credentials import create_assume_role_refresher as carr
from botocore.credentials import DeferredRefreshableCredentials as DRC
from boto3 import Session

session = Session(region_name='us-east-1')
session._session._credentials=DRC(
            refresh_using=carr(session.client("sts"),
                               {'RoleArn':'your arn',
                               'RoleSessionName':'your name'}),
            method='sts-assume-role')

2

为什么会发生这种情况?

查看 boto 代码后,我们可以看到问题所在。 boto3.client(..) 函数调用了 _get_default_session(..)(第92行),我们可以看到 DEFAULT_SESSION 只被实例化一次(第80行),之后总是返回相同的会话(第79行和第83行)。

boto3 code

解决方案

  1. 我手动读取了~/.aws/credentials文件,并在实例化boto3客户端时传递了aws_access_key_idaws_secret_access_keyaws_session_token
  2. 每次调用时实例化boto3客户端
    • 在我的情况下,对boto3的调用次数相对较少
    • 否则,您可以仅在出现异常时实例化客户端
  3. 我创建并运行了一个小的bash脚本,在后台不断刷新AWS凭证并更新~/.aws/credentials文件
  4. 我的Python脚本正在运行30个并行进程,由于我正在读取~/.aws/credentials文件,该文件本身将每10分钟更新一次,因此存在读写期间的风险可能会导致中断。所以我只是在我的Python脚本中添加了一些非常基本的重试逻辑,以便它至少尝试每个失败的操作3次。这样它就变得具有读写期间问题的弹性。

代码片段

以下是我的Python脚本摘录,其中我在实例化客户端时读取了AWS凭证文件

import boto3
from typing import Dict, List, Optional, Callable, Tuple, Any, Union
import configparser

...

# helper method to read AWS Credentials file (which is in a standard INI file format)
def _read_aws_credentials():
    config: configparser.ConfigParser = configparser.ConfigParser()
    config.read('/home/alias/.aws/credentials')
    return config

...

    # read AWS credentials from credential file
    aws_credentials = _read_aws_credentials()
    # use credentials values read above to instantiate client
    client = boto3.client(
        "logs",
        region_name='us-east-1',
        aws_access_key_id=aws_credentials.get("default", "aws_access_key_id"),
        aws_secret_access_key=aws_credentials.get("default", "aws_secret_access_key"),
        aws_session_token=aws_credentials.get("default", "aws_session_token"))

(我无法分享凭据刷新的Bash脚本,因为它使用专有工具)

参考资料

我使用Linux screen来运行我的长时间运行的Python脚本(运行了大约2天)和AWS凭据刷新的Bash脚本

以下是我参考的一些StackOverflow主题

  1. 读取aws凭据文件的最佳方法

  2. 如何使用Python3读写INI文件?


1

这是我的实现,它使用单例设计模式仅在现有凭据过期时生成新凭据

import boto3
from datetime import datetime
from dateutil.tz import tzutc
import os
import binascii


class AssumeRoleProd:
    __credentials = None

    def __init__(self):
        assert True==False

    @staticmethod
    def __setCredentials():
        print("\n\n ======= GENERATING NEW SESSION TOKEN ======= \n\n")
        # create an STS client object that represents a live connection to the
        # STS service
        sts_client = boto3.client('sts')

        # Call the assume_role method of the STSConnection object and pass the role
        # ARN and a role session name.
        assumed_role_object = sts_client.assume_role(
            RoleArn=your_role_here,
            RoleSessionName=f"AssumeRoleSession{binascii.b2a_hex(os.urandom(15)).decode('UTF-8')}"
        )

        # From the response that contains the assumed role, get the temporary
        # credentials that can be used to make subsequent API calls
        AssumeRoleProd.__credentials = assumed_role_object['Credentials']

    @staticmethod
    def getTempCredentials():
        credsExpired = False

        # Return object for the first time 
        if AssumeRoleProd.__credentials is None:
            AssumeRoleProd.__setCredentials()
            credsExpired = True

        # Generate if only 5 minutes are left for expiry. You may setup for entire 60 minutes by catching botocore ClientException
        elif (AssumeRoleProd.__credentials['Expiration']-datetime.now(tzutc())).seconds//60<=5: 
            AssumeRoleProd.__setCredentials()
            credsExpired = True

        return AssumeRoleProd.__credentials

然后我也使用单例设计模式来为客户端生成一个新的客户端,只有在生成新会话时才会生成。如果需要,您还可以添加区域。

class lambdaClient:
    __prodClient = None

    def __init__(self):
        assert True==False

    @staticmethod
    def __initProdClient():
        credsExpired, credentials =  AssumeRoleProd.getTempCredentials()
        if lambdaClient.__prodClient is None or credsExpired:
            lambdaClient.__prodClient = boto3.client('lambda',
                                                       aws_access_key_id=credentials['AccessKeyId'],
                                                       aws_secret_access_key=credentials['SecretAccessKey'],
                                                       aws_session_token=credentials['SessionToken'])
        return lambdaClient.__prodClient


    @staticmethod
    def getProdClient():
        return lambdaClient.__initProdClient()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接