PySpark执行器的日志记录

29

如何使用pyspark在执行器上正确访问Spark的log4j记录器?

在驱动程序中很容易做到这一点,但我似乎无法理解如何在执行器上访问日志记录功能,以便我可以在本地进行记录并让YARN收集本地日志。

是否有任何方法可以访问本地记录器?

标准的日志记录过程不足够,因为我无法从执行器中访问spark context。

3个回答

32

在执行器上无法使用本地log4j记录器。由执行器jvm生成的Python工作进程与Java没有"回调"连接,它们只接收命令。但是有一种方法可以使用标准Python日志记录从执行器记录日志,并通过YARN捕获它们。

在你的HDFS上放置一个Python模块文件,该文件每个Python工作程序只配置一次日志记录,并代理记录函数(将其命名为logger.py):

import os
import logging
import sys

class YarnLogger:
    @staticmethod
    def setup_logger():
        if not 'LOG_DIRS' in os.environ:
            sys.stderr.write('Missing LOG_DIRS environment variable, pyspark logging disabled')
            return 

        file = os.environ['LOG_DIRS'].split(',')[0] + '/pyspark.log'
        logging.basicConfig(filename=file, level=logging.INFO, 
                format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s')

    def __getattr__(self, key):
        return getattr(logging, key)

YarnLogger.setup_logger()

然后在您的应用程序中导入此模块:

spark.sparkContext.addPyFile('hdfs:///path/to/logger.py')
import logger
logger = logger.YarnLogger()

你可以像使用普通日志库一样在你的pyspark函数中使用它:

def map_sth(s):
    logger.info("Mapping " + str(s))
    return s

spark.range(10).rdd.map(map_sth).count()

pyspark.log文件将在资源管理器上可见,并在应用程序完成时被收集,因此您可以使用yarn logs -applicationId ....稍后访问这些日志。

输入图像描述

@Mariusz 所以 'LOG_DIRS' 是一个 HDFS 路径,Yarn 会自动将日志复制到 HDFS,对吗? - Zhang Tong
1
@zhangtong 不是的,LOG_DIRS 是 nodemanagers 上的本地路径,用于存储进程数据。当作业结束时,这些文件会被复制到 HDFS 并保留 yarn.log-aggregation.retain-seconds 秒。有关更多信息,请参见 yarn-site.xml 中的 yarn.log-aggregation* 选项。 - Mariusz
2
如果有人不确定,您可以通过执行yarn logs -applicationId <app_id> -log_files pyspark.log单独访问pyspark.log - snark
@RohanA addPyFile 接受的路径需要是 HDFS 路径。如果想要使用本地文件,请修改 spark-submit 的选项,添加 --py-files your_file.py,这样你就不需要在代码中调用 addPyFile 了。PicklingError 可能表明执行器无法找到日志记录器代码。 - Mariusz
1
如果您想直接将其写入“控制台”,可以使用stream=sys.stdout替换文件。 - kulssaka
显示剩余18条评论

9
请注意,Mariusz的答案返回一个指向日志模块的代理。当您的日志需求非常基本时,这是有效的(获得了赞同)。一旦您有兴趣做像配置多个记录器实例或使用多个处理程序这样的事情,它就会不够用了。例如,如果您有一组更大的代码,只想在调试时运行,解决方案之一是检查记录器实例的isEnabledFor方法,如下所示:
logger = logging.getLogger(__name__)
if logger.isEnabledFor(logging.DEBUG):
    # do some heavy calculations and call `logger.debug` (or any other logging method, really)

当在logging模块上调用该方法时(例如Mariusz的答案),会导致失败,因为logging模块没有这样的属性。
解决此问题的一种方法是创建一个名为spark_logging.py的模块,在其中配置日志记录并返回Logger的新实例。以下代码显示了一个示例,它使用dictConfig配置日志记录。它还添加了一个过滤器,以便在使用根记录器时大大减少来自所有工作节点的重复次数(过滤器示例来自Christopher Dunn(ref))。
# spark_logging.py
import logging
import logging.config
import os
import tempfile
from logging import *  # gives access to logging.DEBUG etc by aliasing this module for the standard logging module


class Unique(logging.Filter):
    """Messages are allowed through just once.
    The 'message' includes substitutions, but is not formatted by the
    handler. If it were, then practically all messages would be unique!
    """
    def __init__(self, name=""):
        logging.Filter.__init__(self, name)
        self.reset()

    def reset(self):
        """Act as if nothing has happened."""
        self.__logged = {}

    def filter(self, rec):
        """logging.Filter.filter performs an extra filter on the name."""
        return logging.Filter.filter(self, rec) and self.__is_first_time(rec)

    def __is_first_time(self, rec):
        """Emit a message only once."""
        msg = rec.msg %(rec.args)
        if msg in self.__logged:
            self.__logged[msg] += 1
            return False
        else:
            self.__logged[msg] = 1
            return True


def getLogger(name, logfile="pyspark.log"):
    """Replaces getLogger from logging to ensure each worker configures
    logging locally."""

    try:
        logfile = os.path.join(os.environ['LOG_DIRS'].split(',')[0], logfile)
    except (KeyError, IndexError):
        tmpdir = tempfile.gettempdir()
        logfile = os.path.join(tmpdir, logfile)
        rootlogger = logging.getLogger("")
        rootlogger.addFilter(Unique())
        rootlogger.warning(
            "LOG_DIRS not in environment variables or is empty. Will log to {}."
            .format(logfile))

    # Alternatively, load log settings from YAML or use JSON.
    log_settings = {
        'version': 1,
        'disable_existing_loggers': False,
        'handlers': {
            'file': {
                'class': 'logging.FileHandler',
                'level': 'DEBUG',
                'formatter': 'detailed',
                'filename': logfile
            },
            'default': {
                'level': 'INFO',
                'class': 'logging.StreamHandler',
            },
        },
        'formatters': {
            'detailed': {
                'format': ("%(asctime)s.%(msecs)03d %(levelname)s %(module)s - "
                           "%(funcName)s: %(message)s"),
            },
        },
        'loggers': {
            'driver': {
                'level': 'INFO',
                'handlers': ['file', ]
            },
            'executor': {
                'level': 'DEBUG',
                'handlers': ['file', ]
            },
        }
    }

    logging.config.dictConfig(log_settings)
    return logging.getLogger(name)

您可以导入此模块并为logging本身设置别名:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Test logging") \
    .getOrCreate()

try:
    spark.sparkContext.addPyFile('s3://YOUR_BUCKET/spark_logging.py')
except:
    # Probably running this locally. Make sure to have spark_logging in the PYTHONPATH
    pass
finally:
    import spark_logging as logging

def map_sth(s):
    log3 = logging.getLogger("executor")
    log3.info("Logging from executor")

    if log3.isEnabledFor(logging.DEBUG):
        log3.debug("This statement is only logged when DEBUG is configured.")

    return s

def main():
    log2 = logging.getLogger("driver")
    log2.info("Logging from within module function on driver")
    spark.range(100).rdd.map(map_sth).count()

if __name__ == "__main__":
    log1 = logging.getLogger("driver")
    log1.info("logging from module level")
    main()

Mariusz's answer类似,日志将可以通过资源管理器进行访问(或在未设置环境变量LOG_DIRS时转储到临时文件夹中)。此脚本顶部所做的错误处理是为了使您能够在本地运行此脚本。
这种方法允许更多的自由:您可以让执行者记录到一个文件中,并在另一个文件中驱动所有种类的聚合计数。
请注意,在这种情况下需要完成的工作略微多于使用类作为内置日志记录模块代理的情况,因为每次在执行程序实例上请求记录器时,都必须对其进行配置。但是,在进行大数据分析时,这可能不会成为主要的时间消耗。;-)

嗨@Oliver W.,非常感谢您。如果在AWS的EMR上使用Spark,LOG_DIRS变量应包含什么内容才能通过资源管理器查看pyspark.log文件? - Roxana

4
我有另一种方法解决PySpark中的日志问题。思路如下:
  • 使用远程日志管理服务(例如Loggly,AWS上的CloudWatch,Azure上的Application Insights等)
  • 在主节点和工作节点上配置相同的日志模块设置,将日志发送到上述服务
如果您已经在使用云服务并且其中许多服务也具有日志收集/管理服务,则这是一个不错的方法。
我在Github上有一个简单的Wordcount示例来演示此方法https://github.com/chhantyal/wordcount 这个Spark应用程序使用标准的logging模块从驱动程序(主节点)和执行器(工作节点)将日志发送到Loggly。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接