为什么我有时使用SQS客户端会出现Key Error错误?

9

我正在使用boto3 SQS客户端从AWS SQS FIFO队列中接收消息。

def consume_msgs():
    sqs = None
    try:
        sqs = boto3.client('sqs',
                       region_name=S3_BUCKET_REGION,
                       aws_access_key_id=AWS_ACCESS_KEY_ID,
                       aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
    except Exception:
        logger.warning('SQS client error {}'.format(sys.exc_info()[0]))
        logger.error(traceback.format_exc())

  ### more code to process message

应用程序通过使用upstart在EC2上设置为服务。大部分时间它都运行良好。但有时当我在代码更改后重新启动服务时,应用程序会因以下错误而退出。
2018-10-06 01:29:38,654 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,658 WARNING SQS client error <class 'KeyError'>
2018-10-06 01:29:38,663 ERROR Traceback (most recent call last):
  File "/home/ec2-user/aae_client/app/run.py", line 194, in consume_msgs
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/__init__.py", line 83, in client
    return _get_default_session().client(*args, **kwargs)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 851, in create_client
    endpoint_resolver = self.get_component('endpoint_resolver')
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 726, in get_component
    return self._components.get_component(name)
  File "/home/ec2-user/aae_client/env/lib64/python3.6/dist-packages/botocore/session.py", line 926, in get_component
    del self._deferred[name]
KeyError: 'endpoint_resolver'

重新启动服务通常可以解决此问题。并不是每次重新启动服务都会出现此问题。令人困惑的是,KeyError警告引导了实际的错误回溯。这个KeyError具体指的是什么?它不能是AWS_SECRET_ACCESS_KEY,因为这个密钥从未改变,并且大多数情况下都能正常工作。该问题发生得相当随机,并且来去无踪,因此很难调试。我不明白这个错误是如何逃过try..except块的。

编辑

根据评论,这似乎与多线程有关。确实有多个线程运行consume_msg函数: def process_msgs():
for i in range(NUM_WORKERS):
    t = threading.Thread(target=consume_msgs, name='worker-%s' % i)
    t.setDaemon(True)
    t.start()
while True:
    time.sleep(MAIN_PROCESS_SLEEP_INTERVAL)
3个回答

14

也许我误解了其他答案的一些内容,但是在多线程执行的情况下,如果这些函数在不同的线程中执行,我不认为拥有一个boto3客户端对象并将其传递给其他函数会起作用。 我一直在遇到间歇性的endpoint_resolver错误调用boto3客户端服务,通过遵循文档中的示例以及对诸如#1246#1592boto3 GitHub问题的评论进行了停止,并在每个线程中创建单独的会话对象。 在我的情况下,这意味着代码几乎可以忽略不计的更改,从

client = boto3.client(variant, region_name = creds['region_name'],
                      aws_access_key_id = ...,
                      aws_secret_access_key = ...)
session = boto3.session.Session()
client = session.client(variant, region_name = creds['region_name'],
                        aws_access_key_id = ...,
                        aws_secret_access_key = ...)
在执行分离线程的函数中,我对 OP 的 consume_msgs() 代码的理解是可以进行类似的更改,这样可以消除偶尔出现的 endpoint_resolver 错误。

1
实际上,仅创建一个boto客户端并不是线程安全的。在单独的线程中使用一个客户端是可以的。另一方面,在单独的线程中使用相同的资源和会话是不安全的。还有一个问题:在每个线程中创建一个会话非常耗时。您可以在文档中找到多线程客户端示例。 - Filip Kwiatkowski

3

这个Github问题建议你在顶层设置sqs客户机(而不是在函数中):

sqs = boto3.client('sqs',
                   region_name=S3_BUCKET_REGION,
                   aws_access_key_id=AWS_ACCESS_KEY_ID,
                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY)


def consume_msgs():
    # code to process message

这个问题提到了同样的事情 / 同样的解决方案。 - Andy Hayden
consume_msg 被多个线程运行(请参见我的新编辑)。将 sqs client 移出此函数将使其最终在主线程中结束。每个线程仍然需要自己的 sqs 客户端,对吗? - ddd
1
您提供的链接建议为每个线程创建session,而不是让客户端共享同一个session。类似于boto3.Session().client这样的方式。 - ddd
@ddd 我认为你误读了,或者至少有一些评论建议在使用线程时将其放置在顶层(并在主线程中)。 - Andy Hayden
1
你也可以将轮询移动到主线程,并仅在工作线程上进行处理。听起来,对于boto3来说,多个sqs客户端和线程的组合是不好的(可能存在错误)。 - Andy Hayden

0

在创建S3客户端时,我遇到了这个错误,但据我所知,这是相同的问题。 在创建客户端的过程中,存在非线程安全的代码:

        if name in self._deferred:
            factory = self._deferred[name]
            self._components[name] = factory()
            # Only delete the component from the deferred dict after
            # successfully creating the object from the factory as well as
            # injecting the instantiated value into the _components dict.
            del self._deferred[name]

(来自botocore/session.py中的get_component方法 - 当尝试删除被不同线程删除的键时引发KeyError的代码)

锁定客户端创建对我有用(如https://github.com/boto/boto3/pull/806中建议的那样)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接