我正在尝试使用boto3与AWS通信,将Pyspark RDD的数据从工作节点发送到SQS队列。我需要直接从分区发送数据,而不是从驱动程序收集RDD并发送数据。
我可以通过boto3在本地和Spark驱动程序中向SQS发送消息;同时,我可以在分区上导入boto3并创建一个boto3会话。然而,当我尝试从分区创建客户端或资源时,我收到一个错误。我认为boto3没有正确地创建客户端,但我不完全确定。我的代码看起来像这样:
错误信息:
更长的回溯信息:
我可以通过boto3在本地和Spark驱动程序中向SQS发送消息;同时,我可以在分区上导入boto3并创建一个boto3会话。然而,当我尝试从分区创建客户端或资源时,我收到一个错误。我认为boto3没有正确地创建客户端,但我不完全确定。我的代码看起来像这样:
def get_client(x): #the x is required to use pyspark's mapPartitions
import boto3
client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_client)
错误信息:
DataNotFoundError: Unable to load data for: endpoints
更长的回溯信息:
File "<stdin>", line 4, in get_client
File "./rebuilt.zip/boto3/session.py", line 250, in client
aws_session_token=aws_session_token, config=config)
File "./rebuilt.zip/botocore/session.py", line 810, in create_client
endpoint_resolver = self.get_component('endpoint_resolver')
File "./rebuilt.zip/botocore/session.py", line 691, in get_component
return self._components.get_component(name)
File "./rebuilt.zip/botocore/session.py", line 872, in get_component
self._components[name] = factory()
File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver
endpoints = loader.load_data('endpoints')
File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper
data = func(self, *args, **kwargs)
File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data
raise DataNotFoundError(data_path=name)
DataNotFoundError: Unable to load data for: endpoints
我还尝试修改了我的函数,创建资源而不是显式客户端,看看它是否能找到并使用默认的客户端设置。在这种情况下,我的代码如下:
def get_resource(x):
import boto3
sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
return x
rdd_with_client = rdd.mapPartitions(get_resource)
我收到一个指向has_low_level_client参数的错误,因为客户端不存在而被触发;traceback显示:
File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func
File "<stdin>", line 4, in session_resource
File "./rebuilt.zip/boto3/session.py", line 329, in resource
has_low_level_client)
ResourceNotExistsError: The 'sqs' resource does not exist.
The available resources are:
-
由于没有客户端来容纳资源,所以没有可用资源。
我已经苦思冥想了几天,需要任何帮助!
data
子目录。您还应确保具有从磁盘读取数据的能力。 - Jordon Phillipsendpoints.json
文件,而boto3
同样无法访问其目录中的数据。我的想法是它要么根本不存在,要么您的环境阻止了访问。 - Jordon Phillips