应用程序启动时检查数据库模式是否与SQLAlchemy模型匹配

26
为了防止人为错误,我想检查当前SQL数据库模式是否与SQLAlchemy模型代码匹配,并且在应用程序启动时没有需要进行迁移的操作。是否有一种方法可以遍历SQLAlchemy中的所有模型,然后查看数据库模式是否符合模型的预期?
这样做是为了防止后续出现错误(如由于表、字段等缺失而导致的HTTP 500错误)。

您可以使用 https://dev59.com/d3I95IYBdhLWcg3w1Bvo 获取模型的架构,并使用 https://metacpan.org/dist/SQL-Translator/view/script/sqlt-diff 与数据库架构进行比较。 - Andrew
2个回答

30

基于 @yoloseem 上面的提示,下面是完整的答案:

import logging

from sqlalchemy import inspect
from sqlalchemy.ext.declarative.clsregistry import _ModuleMarker
from sqlalchemy.orm import RelationshipProperty

logger = logging.getLogger(__name__)


def is_sane_database(Base, session):
    """Check whether the current database matches the models declared in model base.

    Currently we check that all tables exist with all columns. What is not checked

    * Column types are not verified

    * Relationships are not verified at all (TODO)

    :param Base: Declarative Base for SQLAlchemy models to check

    :param session: SQLAlchemy session bound to an engine

    :return: True if all declared models have corresponding tables and columns.
    """

    engine = session.get_bind()
    iengine = inspect(engine)

    errors = False

    tables = iengine.get_table_names()

    # Go through all SQLAlchemy models
    for name, klass in Base._decl_class_registry.items():

        if isinstance(klass, _ModuleMarker):
            # Not a model
            continue

        table = klass.__tablename__
        if table in tables:
            # Check all columns are found
            # Looks like [{'default': "nextval('sanity_check_test_id_seq'::regclass)", 'autoincrement': True, 'nullable': False, 'type': INTEGER(), 'name': 'id'}]

            columns = [c["name"] for c in iengine.get_columns(table)]
            mapper = inspect(klass)

            for column_prop in mapper.attrs:
                if isinstance(column_prop, RelationshipProperty):
                    # TODO: Add sanity checks for relations
                    pass
                else:
                    for column in column_prop.columns:
                        # Assume normal flat column
                        if not column.key in columns:
                            logger.error("Model %s declares column %s which does not exist in database %s", klass, column.key, engine)
                            errors = True
        else:
            logger.error("Model %s declares table %s which does not exist in database %s", klass, table, engine)
            errors = True

    return not errors

以下是用于测试的py.test代码:

"""Tests for checking database sanity checks functions correctly."""

from pyramid_web20.system.model.sanitycheck import is_sane_database
from sqlalchemy import engine_from_config, Column, Integer, String
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import ForeignKey


def setup_module(self):
    # Quiet log output for the tests
    import logging
    from pyramid_web20.system.model.sanitycheck import logger
    #logger.setLevel(logging.FATAL)


def gen_test_model():

    Base = declarative_base()

    class SaneTestModel(Base):
        """A sample SQLAlchemy model to demostrate db conflicts. """

        __tablename__ = "sanity_check_test"

        #: Running counter used in foreign key references
        id = Column(Integer, primary_key=True)

    return Base, SaneTestModel


def gen_relation_models():

    Base = declarative_base()

    class RelationTestModel(Base):
        __tablename__ = "sanity_check_test_2"
        id = Column(Integer, primary_key=True)


    class RelationTestModel2(Base):
        __tablename__ = "sanity_check_test_3"
        id = Column(Integer, primary_key=True)

        test_relationship_id = Column(ForeignKey("sanity_check_test_2.id"))
        test_relationship = relationship(RelationTestModel, primaryjoin=test_relationship_id == RelationTestModel.id)

    return Base, RelationTestModel, RelationTestModel2


def gen_declarative():

    Base = declarative_base()

    class DeclarativeTestModel(Base):
        __tablename__ = "sanity_check_test_4"
        id = Column(Integer, primary_key=True)

        @declared_attr
        def _password(self):
            return Column('password', String(256), nullable=False)

        @hybrid_property
        def password(self):
            return self._password

    return Base, DeclarativeTestModel


def test_sanity_pass(ini_settings, dbsession):
    """See database sanity check completes when tables and columns are created."""

    engine = engine_from_config(ini_settings, 'sqlalchemy.')
    conn = engine.connect()
    trans = conn.begin()

    Base, SaneTestModel = gen_test_model()
    Session = sessionmaker(bind=engine)
    session = Session()
    try:
        Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
    except sqlalchemy.exc.NoSuchTableError:
        pass

    Base.metadata.create_all(engine, tables=[SaneTestModel.__table__])

    try:
        assert is_sane_database(Base, session) is True
    finally:
        Base.metadata.drop_all(engine)


def test_sanity_table_missing(ini_settings, dbsession):
    """See check fails when there is a missing table"""

    engine = engine_from_config(ini_settings, 'sqlalchemy.')
    conn = engine.connect()
    trans = conn.begin()

    Base, SaneTestModel = gen_test_model()
    Session = sessionmaker(bind=engine)
    session = Session()

    try:
        Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
    except sqlalchemy.exc.NoSuchTableError:
        pass

    assert is_sane_database(Base, session) is False


def test_sanity_column_missing(ini_settings, dbsession):
    """See check fails when there is a missing table"""

    engine = engine_from_config(ini_settings, 'sqlalchemy.')
    conn = engine.connect()
    trans = conn.begin()

    Session = sessionmaker(bind=engine)
    session = Session()
    Base, SaneTestModel = gen_test_model()
    try:
        Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__])
    except sqlalchemy.exc.NoSuchTableError:
        pass
    Base.metadata.create_all(engine, tables=[SaneTestModel.__table__])

    # Delete one of the columns
    engine.execute("ALTER TABLE sanity_check_test DROP COLUMN id")

    assert is_sane_database(Base, session) is False


def test_sanity_pass_relationship(ini_settings, dbsession):
    """See database sanity check understands about relationships and don't deem them as missing column."""

    engine = engine_from_config(ini_settings, 'sqlalchemy.')
    conn = engine.connect()
    trans = conn.begin()

    Session = sessionmaker(bind=engine)
    session = Session()

    Base, RelationTestModel, RelationTestModel2  = gen_relation_models()
    try:
        Base.metadata.drop_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__])
    except sqlalchemy.exc.NoSuchTableError:
        pass

    Base.metadata.create_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__])

    try:
        assert is_sane_database(Base, session) is True
    finally:
        Base.metadata.drop_all(engine)


def test_sanity_pass_declarative(ini_settings, dbsession):
    """See database sanity check understands about relationships and don't deem them as missing column."""

    engine = engine_from_config(ini_settings, 'sqlalchemy.')
    conn = engine.connect()
    trans = conn.begin()

    Session = sessionmaker(bind=engine)
    session = Session()

    Base, DeclarativeTestModel = gen_declarative()
    try:
        Base.metadata.drop_all(engine, tables=[DeclarativeTestModel.__table__])
    except sqlalchemy.exc.NoSuchTableError:
        pass

    Base.metadata.create_all(engine, tables=[DeclarativeTestModel.__table__])

    try:
        assert is_sane_database(Base, session) is True
    finally:
        Base.metadata.drop_all(engine)

1
太棒了!这是一个非常棒的图书馆想法。 - Tim Tisdall
1
如果我们除了列名称外还能检查列类型,那就太完美了。@MikkoOhtamaa - Nam G VU

7

1
谢谢!我稍微扩展了一下 - 请看下面。 - Mikko Ohtamaa

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接