AWS EMR: Pyspark: Rdd: mappartitions: 在搜索时找不到有效的 SPARK_HOME:Spark 闭包。

7

我有一个pyspark作业,本地运行时没有任何问题,但是当它在AWS集群上运行时,它在到达以下代码时就会卡住。该作业只处理100条记录。 "some_function"将数据发布到网站并在最后返回响应。有什么想法出了什么问题或者我该如何调试?另外,"Some_function"在类的外部,我猜问题与["闭包"][1]有关,但不确定如何解决。

response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

完整代码如下

def ctgs(entries):
    col1 = entries[0]
    col2 = entries[1]
    col3 = entries[2]

    rec = {
    up_col1 : col1,
    up_col2 : col2,
    up_col3 : col3

    }

    return rec

def some_func1(rec, dict_name, id):
recs{
    rec_list = list(rec)
    seid = id
    }
    headers = "some header"
    attrburl = "www.someurl.com"

    response = requests.post(attrburl, data=json.dumps(rec_list)), headers)

    return response


class Processor:
    def __init(self, sc, arguments):
    self.sc = sc
    self.env = arguments.env
    self.dte = arguments.dte
    self.sendme = arguments.sendme

    def send_them(ext_data, dict_name,id):
        attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))

        response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

    def extr_data(self):
        ext_data=spark.sql('''select col1, col2, col3 from table_name''')
        send_them(ext_data,dict_name,id)


    def process(self):
        dict_name = { dict_id: '34343-3434-3433-343'}
        id = 'dfdfd-erere-dfd'
        extr_data()



def argument_parsing(args):
    parser.add_argument("--env", required=True)
    parser.add_argument("--dte",  required=True)
    parser.add_argument("--sendme", required=False)
    args = parser.parse_args(args)
    return args


def main(args):

        arguments = argument_parsing(args)

        sc = SparkSession \
            .builder \
            .appName("job_name") \
            .enableHiveSupport() \
            .getOrCreate()

        sc.sparkContext.setLogLevel("ERROR")

        processor = Processor(sc, arguments)
        processor.process()

显然存在连接到给定URL的问题。您可以从指定超时开始。可能还有一个选项在“请求”模块中启用调试消息。 - gudok
你展示的日志看起来像是驱动程序日志。在执行者日志中你看到了什么?还有你确定出站网络已启用吗? - pltc
将日志级别更改为调试模式(如果尚未更改),将requests.post()调用放入“try-except”块中,并手动检查是否可以从AWS集群访问attribute_url或某个防火墙规则阻止连接。 - zweack
@gudok FYI - 我在想是否我的问题是因为在mappartitions内部调用map函数?其中函数“some_function”位于类外部,调用一个位于类外部的函数是否会引起任何问题。就像这个 https://dev59.com/XqLia4cB1Zd3GeqPfTXP ,但我不确定如何在我的情况下实现这一点。 - user7343922
@gudok,看起来这个问题和我的几乎一样(我在执行器中也遇到了同样的错误),但是我对发布的解决方案不是很清楚,你能否给我一些提示?https://stackoverflow.com/questions/64462317/could-not-find-valid-spark-home-while-searching-on-aws-emr - user7343922
显示剩余3条评论
2个回答

1

你说得对,这是一个与闭包/执行器有关的问题。

在集群中,mapPartitions内部的代码将在执行器上运行。在本地运行会掩盖这些类型的错误,因为它将所有函数限定在运行在您机器上的驱动程序范围内。在“本地”模式下没有作用域问题。

处理闭包/执行器时有两种类型的问题。一个是你的作用域变量无法序列化,另一个是执行器正在运行的环境。

环境检查应该很容易。如果你只是通过ssh连接并尝试连接,你能否从执行器之一连接到URL?(我认为你正在等待DNS查找URL)。实际上,我建议您首先检查EMR集群的安全组,并查看允许访问哪些节点。

范围稍微有点具有挑战性。如果在全局范围内启动requests但不可序列化,这可能会导致问题。(您无法将正在进行中的连接序列化到数据库/网站。)您可以在mapPartitions内部启动它,这将解决该问题。问题是,这通常会立即失败,并且并不真正适合您所描述的问题。除非这导致Python解释器死机并错误地报告它正在等待,否则我认为这不是问题。

是的,请求是从类外部的函数中发布的,我甚至尝试使用1个节点运行此操作。即使如此,仍然存在相同的问题。我能够从执行器访问URL。但正如我在帖子中提到的那样,我收到了“找不到SPARK_HOME路径”的错误。但是我如何在mappartitions内部启动它?我该如何解决这个问题?还有,我如何将此处发布的解决方案应用于我的情况?https://stackoverflow.com/questions/64462317/could-not-find-valid-spark-home-while-searching-on-aws-emr - user7343922
在本地之外,您将始终遇到闭包问题,依赖于执行器上的Spark上下文(-->找不到SPARK_HOME路径)。 (--> mapPartitions内的代码) 您需要在mapPartions内初始化连接,由于您没有发布“请求”代码,我无法告诉您如何执行此操作。 - Matt Andruff

0

我建议您尝试这个Python解决方案。它不使用依赖于Spark会话的Pyspark请求对象。为了使其正常工作,您需要在每个节点上安装此库或者将其传递给spark-submit。基本上,这消除了对Spark Session的要求,因此可以在MapPartition封闭中使用。

基本上,您将使用:

def some_func1(rec, dict_name, id):
    import grequest
    recs{
        rec_list = list(rec)
        seid = id
    }
    headers = "some header"
    attrburl = "www.someurl.com"

    response = grequests.post(attrburl, data=json.dumps(rec_list)), headers)

    return response

是的,在发布悬赏之前,我已经用Python重写了整个程序,但我想知道是否有任何PySpark解决方案。 - user7343922
当使用mapPartitions时,解决方案是使用与语言相关的工具(即Python工具),而不是可能依赖于Spark上下文的Spark相关工具。 基本上,您应该使用Spark,但在'mapParitions'内部使用不依赖于Spark内部的Python代码。 - Matt Andruff

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接