Python 日志记录到数据库

58
我正在寻找一种方法,让Python的日志记录模块能够记录到数据库,并在数据库不可用时回退到文件系统。
基本上有两件事情:如何让记录器记录到数据库,以及如何使其在数据库不可用时回退到文件记录。

1
你碰巧找到现有的实现了吗? - haridsv
2
是的,这个链接非常有帮助:http://github.com/dcramer/django-db-log,至少可以看到所需数据库的架构。 - khelll
你最终得到了什么? - johnny
5个回答

41

我最近成功用Python编写了自己的数据库记录器。由于我找不到任何示例,所以我想在这里发布我的代码。它可与MS SQL一起使用。

数据库表可能如下所示:

CREATE TABLE [db_name].[log](
    [id] [bigint] IDENTITY(1,1) NOT NULL,
    [log_level] [int] NULL,
    [log_levelname] [char](32) NULL,
    [log] [char](2048) NOT NULL,
    [created_at] [datetime2](7) NOT NULL,
    [created_by] [char](32) NOT NULL,
) ON [PRIMARY]

类本身:

class LogDBHandler(logging.Handler):
    '''
    Customized logging handler that puts logs to the database.
    pymssql required
    '''
    def __init__(self, sql_conn, sql_cursor, db_tbl_log):
        logging.Handler.__init__(self)
        self.sql_cursor = sql_cursor
        self.sql_conn = sql_conn
        self.db_tbl_log = db_tbl_log

    def emit(self, record):
        # Set current time
        tm = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.created))
        # Clear the log message so it can be put to db via sql (escape quotes)
        self.log_msg = record.msg
        self.log_msg = self.log_msg.strip()
        self.log_msg = self.log_msg.replace('\'', '\'\'')
        # Make the SQL insert
        sql = 'INSERT INTO ' + self.db_tbl_log + ' (log_level, ' + \
            'log_levelname, log, created_at, created_by) ' + \
            'VALUES (' + \
            ''   + str(record.levelno) + ', ' + \
            '\'' + str(record.levelname) + '\', ' + \
            '\'' + str(self.log_msg) + '\', ' + \
            '(convert(datetime2(7), \'' + tm + '\')), ' + \
            '\'' + str(record.name) + '\')'
        try:
            self.sql_cursor.execute(sql)
            self.sql_conn.commit()
        # If error - print it out on screen. Since DB is not working - there's
        # no point making a log about it to the database :)
        except pymssql.Error as e:
            print sql
            print 'CRITICAL DB ERROR! Logging to database not possible!'

并且使用示例:

import pymssql
import time
import logging

db_server = 'servername'
db_user = 'db_user'
db_password = 'db_pass'
db_dbname = 'db_name'
db_tbl_log = 'log'

log_file_path = 'C:\\Users\\Yourname\\Desktop\\test_log.txt'
log_error_level     = 'DEBUG'       # LOG error level (file)
log_to_db = True                    # LOG to database?

class LogDBHandler(logging.Handler):
    [...]

# Main settings for the database logging use
if (log_to_db):
    # Make the connection to database for the logger
    log_conn = pymssql.connect(db_server, db_user, db_password, db_dbname, 30)
    log_cursor = log_conn.cursor()
    logdb = LogDBHandler(log_conn, log_cursor, db_tbl_log)

# Set logger
logging.basicConfig(filename=log_file_path)

# Set db handler for root logger
if (log_to_db):
    logging.getLogger('').addHandler(logdb)
# Register MY_LOGGER
log = logging.getLogger('MY_LOGGER')
log.setLevel(log_error_level)

# Example variable
test_var = 'This is test message'

# Log the variable contents as an error
log.error('This error occurred: %s' % test_var)
上面的代码将同时记录到数据库和文件中。如果不需要文件记录,则跳过“logging.basicConfig(filename=log_file_path)”行。使用“log”记录的所有内容都将作为“MY_LOGGER”记录。如果出现某些外部错误(例如,在导入的模块中或其他地方),则错误将显示为“root”,因为“root”记录器也处于活动状态,并且正在使用数据库处理程序。

10
感谢提供这个脚本。但请注意,建议在INSERT SQL语句中使用参数。这有助于提高性能(因为INSERT语句可以被缓存),同时也有利于安全性。 - Diego Jancic
这个非常好用!我也尝试了使用pyodbc,它也完美地工作了。 - Euler_Salter
你可能还想考虑单独调用 flush() 方法,以及可能的 __del__() 方法(在程序退出时自动刷新),如果你不介意将记录存储到想要在单个事务中写入所有记录到数据库之前。如果存在显著的延迟,这尤其有助于提高性能。 - Max Candocia

18

编写一个处理程序,将日志记录发送到相关的数据库。如果有失败,可以从记录器的处理程序列表中删除它。有许多方法来处理故障模式。


4
我编写了一个线程安全的SQLite处理程序:https://gist.github.com/2662203#file_sqlite_handler.py - Yarin

11

使用备用日志记录器将Python日志记录到数据库


问题

我在服务器内部运行Django项目时遇到了同样的问题,因为有时需要远程查看日志。


解决方案

首先,我们需要一个处理程序来将日志插入数据库中。在此之前,由于我的SQL不好,我选择了ORMSQLAlchemy

模型:

# models.py
from sqlalchemy import Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
import datetime

base = declarative_base()


class Log(base):
    __tablename__ = "log"
    id = Column(Integer, primary_key=True, autoincrement=True)
    time = Column(DateTime, nullable=False, default=datetime.datetime.now)
    level_name = Column(String(10), nullable=True)
    module = Column(String(200), nullable=True)
    thread_name = Column(String(200), nullable=True)
    file_name = Column(String(200), nullable=True)
    func_name = Column(String(200), nullable=True)
    line_no = Column(Integer, nullable=True)
    process_name = Column(String(200), nullable=True)
    message = Column(Text)
    last_line = Column(Text)

这是插入到数据库的CRUD代码:

#crud.py
import sqlalchemy
from .models import base
from traceback import print_exc


class Crud:
    def __init__(self, connection_string=f'sqlite:///log_db.sqlite3',
                 encoding='utf-8',
                 pool_size=10,
                 max_overflow=20,
                 pool_recycle=3600):

        self.connection_string = connection_string
        self.encoding = encoding
        self.pool_size = pool_size
        self.max_overflow = max_overflow
        self.pool_recycle = pool_recycle
        self.engine = None
        self.session = None

    def initiate(self):
        self.create_engine()
        self.create_session()
        self.create_tables()

    def create_engine(self):
        self.engine = sqlalchemy.create_engine(self.connection_string)

    def create_session(self):
        self.session = sqlalchemy.orm.Session(bind=self.engine)

    def create_tables(self):
        base.metadata.create_all(self.engine)

    def insert(self, instances):
        try:
            self.session.add(instances)
            self.session.commit()
            self.session.flush()
        except:
            self.session.rollback()
            raise

    def __del__(self):
        self.close_session()
        self.close_all_connections()

    def close_session(self):
        try:
            self.session.close()
        except:
            print_exc()
        else:
            self.session = None

    def close_all_connections(self):
        try:
            self.engine.dispose()
        except:
            print_exc()
        else:
            self.engine = None

处理程序:
# handler.py
from logging import Handler, getLogger
from traceback import print_exc
from .crud import Crud
from .models import Log


my_crud = Crud(
    connection_string=<connection string to reach your db>,
    encoding='utf-8',
    pool_size=10,
    max_overflow=20,
    pool_recycle=3600)

my_crud.initiate()


class DBHandler(Handler):
    backup_logger = None

    def __init__(self, level=0, backup_logger_name=None):
        super().__init__(level)
        if backup_logger_name:
            self.backup_logger = getLogger(backup_logger_name)

    def emit(self, record):
        try:
            message = self.format(record)
            try:
                last_line = message.rsplit('\n', 1)[-1]
            except:
                last_line = None

            try:
                new_log = Log(module=record.module,
                              thread_name=record.threadName,
                              file_name=record.filename,
                              func_name=record.funcName,
                              level_name=record.levelname,
                              line_no=record.lineno,
                              process_name=record.processName,
                              message=message,
                              last_line=last_line)
                # raise

                my_crud.insert(instances=new_log)
            except:
                if self.backup_logger:
                    try:
                        getattr(self.backup_logger, record.levelname.lower())(record.message)
                    except:
                        print_exc()
                else:
                    print_exc()

        except:
            print_exc()

检查记录器的测试:

# test.py
from logging import basicConfig, getLogger, DEBUG, FileHandler, Formatter
from .handlers import DBHandler

basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            datefmt='%d-%b-%y %H:%M:%S',
            level=DEBUG)
format = Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

backup_logger = getLogger('backup_logger')
file_handler = FileHandler('file.log')
file_handler.setLevel(DEBUG)
file_handler.setFormatter(format)
backup_logger.addHandler(file_handler)

db_logger = getLogger('logger')
db_handler = DBHandler(backup_logger_name='backup_logger')
db_handler.setLevel(DEBUG)
db_handler.setFormatter(format)
db_logger.addHandler(db_handler)

if __name__ == "__main__":
    db_logger.debug('debug: hello world!')
    db_logger.info('info: hello world!')
    db_logger.warning('warning: hello world!')
    db_logger.error('error: hello world!')
    db_logger.critical('critical: hello world!!!!')

您可以看到处理程序接受一个备用日志记录器,在数据库插入失败时可以使用它。

一个好的改进可以通过线程将日志记录到数据库中。


1
任何人都可以查看 https://github.com/R-agha-m/db_logger 获取更新。 - r_agha_m

3
我再次挖掘这个话题。有一个使用SqlAlchemy的解决方案(此配方不需要Pyramid): https://docs.pylonsproject.org/projects/pyramid-cookbook/en/latest/logging/sqlalchemy_logger.html 并且您可以通过添加额外字段来改善日志记录,这是一份指南: https://dev59.com/RWMm5IYBdhLWcg3whPPq#17558764

回退到FS

不确定这是否完全正确,但您可以拥有2个处理程序:
  1. 数据库处理程序(写入到数据库)
  2. 文件处理程序(写入到文件或流)
只需用 try-except 包装DB提交即可。 但请注意:该文件将包含所有日志条目,而不仅仅是保存失败的所有条目。

2

这是一个老问题,但是我为其他人提供解答。如果你想使用Python日志记录,可以添加两个处理程序。一个用于写入文件,即旋转文件处理程序。这是稳健的,并且可以在数据库是否启动的情况下完成。

另一个可以写入到另一个服务/模块,比如pymongo集成。

查阅logging.config以了解如何从代码或json设置处理程序。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接