SQLAlchemy + Alembic:创建模式迁移

20

我不确定如何定义一个 create schema foo 迁移?我的模型看起来像这样(我正在使用 Flask-Migrate):

class MyTable(db.Model):
    __tablename__ = "my_table"
    __table_args__ = {"schema": "foo"}

    id = ColumnPrimKey()
    name = Column(Text, unique=True, nullable=False)

当我执行mange db upgrade时,由于模式“foo”不存在,我会失败。我如何使用SQLAlchemy和Alembic为模式添加迁移?

当我执行mange db upgrade时,由于模式"foo"不存在,我失败了。我如何使用SQLAlchemy和Alembic为该模式添加迁移?

5个回答

32

我通过修改迁移upgrade命令,使其首先运行以下命令来实现这一点:

op.execute("create schema foo")

而在 downgrade 函数中

op.execute("drop schema foo")

所以整个迁移文件大致如下:

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '6c82c972c61e'
down_revision = '553260b3e828'
branch_labels = None
depends_on = None


def upgrade():
    op.execute("create schema foo")
    ...

def downgrade():
    ...
    op.execute("drop schema foo")

有没有办法让这个与 sqlalchemy.event 一起工作?例如,event.listen(Base.metadata, "before_create", CreateSchema("foo")) - maxcountryman
这解决了迁移的问题,但如果您有另一个迁移,它将无法再次检测到您的模式并生成所有表格... 这可能会带来麻烦。有没有一种方法可以检测不同模式中的更改? - Pstr
1
@Pstr,您可以在元数据类上指定模式属性,方法有两种:metadata = Metadata(schema=<schema-name>) 或者 Base = declarative_base(schema=<schema-name>) - Bin H.
作为对我自己在这里提出的问题的回答,如果您在alembic/env.py文件的context.configure()函数中设置参数include_schemas=True,alembic将会承认模式。虽然不确定为什么这个设置不是默认开启的。 - Pstr
我不明白为什么 revision --autogenerate 没有检测到并创建这些模式。我是否缺乏某些技术上的了解?(关于在哪里放置 alembic_version 的问题主导了一些讨论,但这与自动生成要进入表格的模式无关。) - LondonRob

3
接受答案可能存在的一个问题是,在进行初始迁移时,Alembic 可能会有困难找到创建 alembic_version 的位置。这是因为 op.execute("create schema foo") 只有在 Alembic 尝试查找其 alembic_version 表之后才执行。错误会出现如下所示:
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.InvalidSchemaName) schema "foo" does not exist

一种简单的方法是将 alembic_version 表放在另一个模式中,并将 version_table_schema 传递给 context.configure() (文档)。

然而,在许多情况下,人们可能希望同时(i)创建一个模式作为初始迁移的一部分(例如建立测试环境),并且(ii)将 alembic_version 表放在目标模式中。 在这些情况下,一种替代方法是将模式创建委托给 env.py。 示例:

# env.py
from sqlalchemy import create_engine

from bar import create_database_with_schema_if_not_exists

SQLALCHEMY_DATABASE_URI = ...
schema = "foo"
engine = create_engine(SQLALCHEMY_DATABASE_URI)
create_database_with_schema_if_not_exists(engine, schema)
...

# bar.py
import sqlalchemy
from sqlalchemy_utils import create_database, database_exists

def create_database_with_schema_if_not_exists(engine, schema):
    if not database_exists(engine.url):
        create_database(engine.url)
    if not engine.dialect.has_schema(engine, schema):
        engine.execute(sqlalchemy.schema.CreateSchema(schema))

1

由于 alembic_version 表是在新模式内创建的,因此使用 mssql+pyodbc 方言时,其他方法都对我无效。

这个可以:

with context.begin_transaction():
    context.execute(
        f"""
        IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
        BEGIN 
          EXEC('CREATE SCHEMA {schema}') 
        END
        """
    )
    context.run_migrations()

1
另一种选择是添加以下函数来修改env.py中的MigrationScript指令:
from alembic import operations

def process_revision_directives(context, revision, directives):
    """Modify the MigrationScript directives to create schemata as required.
    """
    script = directives[0]
    for schema in frozenset(i.schema for i in target_metadata.tables.values()):
        script.upgrade_ops.ops.insert(
            0, operations.ops.ExecuteSQLOp(f"CREATE SCHEMA IF NOT EXISTS {schema}"))
        script.downgrade_ops.ops.append(
            operations.ops.ExecuteSQLOp(f"DROP SCHEMA IF EXISTS {schema} RESTRICT"))

然后在context.configure中添加process_revision_directives=process_revision_directives


0
这里有一个版本,当迁移中首次出现时,它会自动创建模式(将其添加到env.py中或至少从那里导入):
import logging
from collections.abc import Iterable
from typing import Any

import alembic
import sqlalchemy.sql.base
from alembic.autogenerate.api import AutogenContext
from alembic.operations.ops import (
    CreateTableOp,
    ExecuteSQLOp,
    UpgradeOps,
)

_logger = logging.getLogger(f"alembic.{__name__}")


class ExecuteArbitraryDDLOp(ExecuteSQLOp):
    def __init__(
        self,
        ddl: sqlalchemy.sql.base.Executable | str,
        reverse_ddl: sqlalchemy.sql.base.Executable | str,
        *,
        execution_options: dict[str, Any] | None = None,
    ) -> None:
        """A DDL Operation with both upgrade and downgrade commands."""
        super().__init__(ddl, execution_options=execution_options)
        self.reverse_ddl = reverse_ddl

    def reverse(self) -> "ExecuteArbitraryDDLOp":
        """Return the reverse of this ExecuteArbitraryDDLOp (used for downgrades)."""
        return ExecuteArbitraryDDLOp(
            ddl=self.reverse_ddl, reverse_ddl=self.sqltext, execution_options=self.execution_options
        )


@alembic.autogenerate.comparators.dispatch_for("schema")
def create_missing_schemas(
    autogen_context: AutogenContext, upgrade_ops: UpgradeOps, schema_names: Iterable[str | None]
) -> None:
    """Creates missing schemas.

    This depends on sqla/alembic to give us all existing
    schemas in the schema_names argument.
    """
    used_schemas = set()
    for operations_group in upgrade_ops.ops:
        # We only care about Tables at the top level, so this is enough for us.
        if isinstance(operations_group, CreateTableOp) and operations_group.schema:
            used_schemas.add(operations_group.schema)

    existing_schemas = set(schema_names)
    missing_schemas = used_schemas - existing_schemas
    if missing_schemas:
        for schema in missing_schemas:
            _logger.info("Add migration ops for schema: %s", schema)
            upgrade_ops.ops.insert(
                0,
                ExecuteArbitraryDDLOp(
                    ddl=f"CREATE SCHEMA {schema}",
                    reverse_ddl=f"DROP SCHEMA {schema}",
                ),
            )

来源: https://www.katzien.de/en/posts/2023-11-16-create-missing-schemas-in-alembic/(免责声明:本人的博客)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接