boto3无法在pyspark worker上创建客户端?

8
我正在尝试使用boto3与AWS通信,将Pyspark RDD的数据从工作节点发送到SQS队列。我需要直接从分区发送数据,而不是从驱动程序收集RDD并发送数据。
我可以通过boto3在本地和Spark驱动程序中向SQS发送消息;同时,我可以在分区上导入boto3并创建一个boto3会话。然而,当我尝试从分区创建客户端或资源时,我收到一个错误。我认为boto3没有正确地创建客户端,但我不完全确定。我的代码看起来像这样:
def get_client(x):   #the x is required to use pyspark's mapPartitions
    import boto3
    client = boto3.client('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
    return x

rdd_with_client = rdd.mapPartitions(get_client)

错误信息:
DataNotFoundError: Unable to load data for: endpoints

更长的回溯信息:
File "<stdin>", line 4, in get_client
  File "./rebuilt.zip/boto3/session.py", line 250, in client
    aws_session_token=aws_session_token, config=config)
  File "./rebuilt.zip/botocore/session.py", line 810, in create_client
    endpoint_resolver = self.get_component('endpoint_resolver')
  File "./rebuilt.zip/botocore/session.py", line 691, in get_component
    return self._components.get_component(name)
  File "./rebuilt.zip/botocore/session.py", line 872, in get_component
    self._components[name] = factory()
  File "./rebuilt.zip/botocore/session.py", line 184, in create_default_resolver
    endpoints = loader.load_data('endpoints')
  File "./rebuilt.zip/botocore/loaders.py", line 123, in _wrapper
    data = func(self, *args, **kwargs)
  File "./rebuilt.zip/botocore/loaders.py", line 382, in load_data
    raise DataNotFoundError(data_path=name)
DataNotFoundError: Unable to load data for: endpoints

我还尝试修改了我的函数,创建资源而不是显式客户端,看看它是否能找到并使用默认的客户端设置。在这种情况下,我的代码如下:

def get_resource(x):
    import boto3
    sqs = boto3.resource('sqs', region_name="us-east-1", aws_access_key_id="myaccesskey", aws_secret_access_key="mysecretaccesskey")
    return x

rdd_with_client = rdd.mapPartitions(get_resource)

我收到一个指向has_low_level_client参数的错误,因为客户端不存在而被触发;traceback显示:

File "/usr/lib/spark/python/pyspark/rdd.py", line 2253, in pipeline_func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 270, in func
  File "/usr/lib/spark/python/pyspark/rdd.py", line 689, in func
  File "<stdin>", line 4, in session_resource
  File "./rebuilt.zip/boto3/session.py", line 329, in resource
    has_low_level_client)
ResourceNotExistsError: The 'sqs' resource does not exist.
The available resources are:
   -

由于没有客户端来容纳资源,所以没有可用资源。

我已经苦思冥想了几天,需要任何帮助!


请找到 botocore 安装的位置并检查 data 子目录。您还应确保具有从磁盘读取数据的能力。 - Jordon Phillips
嗨Jordan,我在数据子目录中要找什么?我在那里有一个名为endpoints.json的文件,但这似乎是与此回溯相关的全部内容。 - EmmaOnThursday
由于某种原因,botocore 无法访问 endpoints.json 文件,而 boto3 同样无法访问其目录中的数据。我的想法是它要么根本不存在,要么您的环境阻止了访问。 - Jordon Phillips
1个回答

13
这是因为您将boto3捆绑包作为zip文件。

"./rebuilt.zip/boto3"

boto3在初始化时会下载一堆文件并将其保存在分发文件夹中。因为你的boto3位于一个压缩包中,所以显然这些文件无法到达那里。

解决方法是,不要将boto3分发到zip中,而是应该在Spark环境中安装boto3。在这里要小心,你可能需要在主节点和工作节点上都安装boto3,这取决于你如何实现你的应用程序。最保险的做法是在两者上都安装。

如果你正在使用EMR,可以使用引导步骤来完成。这里是详细文档

如果你正在使用AWS Glue 2.0,可以使用--additional-python-modules来包含boto3。这里是详细文档

如果您正在使用GCP Dataproc,可以通过指定集群属性来存档。这里是详细文档

嗯,我在这里遇到了同样的问题,这真的很糟糕。不过我会回退到boto上,因为它默认安装在emr集群上。 - avocado
或者,你可以使用引导操作运行“pip install boto3”。毋需回退到boto,因为它即将被弃用。 - Tom Tang
虽然听起来有点“hacky”,但它可能有效。那些JSON文件可以在初始化后下载。所以你可以使用boto3初始化你使用的AWS服务,并将完全相同的boto3副本打包到你的软件包中。 - Tom Tang
但我更倾向于我的答案建议的方法。在您的集群中安装boto3到系统python中,而不要将boto3打包到zip文件中。当导入boto3时,它将使用在系统python中安装的boto3,并且那些数据文件应该得到适当处理。 - Tom Tang
1
从zip文件中删除boto3和botocore,并包含此参数“--additional-python-modules boto3”解决了该问题。谢谢。 - sgalinma
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接