使用r2dbc实现的基于架构的多租户应用程序

8
我正在开发一个使用Spring-Webflux和Spring-data-r2dbc的多租户响应式应用程序,使用r2dbc驱动程序连接到Postgresql数据库。多租户部分是基于模式的:每个租户一个模式。因此,根据上下文(例如已登录的用户),请求将命中数据库的某个模式。
我在研究如何在r2dbc中实现这一点。理想情况下,它应该像Hibernate一样使用MultiTenantConnectionProvider(请参见示例16.3)。
我找到了一些资料并做了一些工作:
  • Could use AbstractRoutingConnectionFactory as mentioned here. But I'm force to create a ConnectionFactory by tenant/schema. It seems to me that is far from efficient/scalable, I would rather use a connection pool like r2dbc-pool
  • I looked at PostgresqlConnectionFactory. Interesting thing here is that on prepareConnection there's a call on setSchema(connection):

    private Mono<Void> setSchema(PostgresqlConnection connection) {
        if (this.configuration.getSchema() == null) {
            return Mono.empty();
        }
    
        return connection.createStatement(String.format("SET SCHEMA '%s'", this.configuration.getSchema()))
            .execute()
            .then();
    }
    
也许我需要找到一种方法来覆盖这个设置,以便从上下文动态获取模式,而不是从配置中获取?
  • Otherwise I could try to specify the schema in the request as a table prefix:

        String s = "tenant-1";
        databaseClient.execute("SELECT * FROM \"" + s + "\".\"city\"")
                .as(City.class)
                .fetch()
                .all()
    
但是我不能再使用SpringData了,或者我需要覆盖每个请求以将租户作为参数传递。任何提示/帮助都将不胜感激 :)
4个回答

1

我已经创建了一个使用每个数据库策略的r2dbc多租户示例。

在这里检查完整的示例代码

在一些数据库中,模式和数据库概念是等效的。如果您坚持使用每个模式的策略,在获取连接时添加SQL以选择模式(请研究您使用的数据库,并确定设置模式的正确子句)。


1
感谢您的回答。我最终采用了以下解决方案:
创建一个按租户/模式构建的ConnectionFactory:
public class CloudSpringUtilsConnectionFactoryBuilder implements ConnectionFactoryBuilder {

@Override
public ConnectionFactory buildConnectionFactory(String schema) {
    PostgresqlConnectionConfiguration configuration = getPostgresqlConnectionConfigurationBuilder(schema)
            .build();
    return new PostgresqlConnectionFactory(configuration);
}

@Override
public ConnectionFactory buildSimpleConnectionFactory() {
    PostgresqlConnectionConfiguration configuration = getPostgresqlConnectionConfigurationBuilder(null)
            .build();
    return new PostgresqlConnectionFactory(configuration);
}

protected PostgresqlConnectionConfiguration.Builder getPostgresqlConnectionConfigurationBuilder(String schema) {
    return PostgresqlConnectionConfiguration
            .builder()
            .username(dbUser)
            .password(dbPassword)
            .host(dbHost)
            .port(dbPort)
            .database(dbName)
            .schema(schema);
}

创建一个TenantRoutingConnectionFactory,根据租户获取正确的ConnectionFactory。在我们的情况下,租户是从身份验证主体(将令牌转换为UserProfile)中提取的。
public class TenantRoutingConnectionFactory extends AbstractRoutingConnectionFactory {

private final DatabaseMigrationService databaseMigrationService;
private final ConnectionFactoryBuilder connectionFactoryBuilder;

private final Map<String, ConnectionFactory> targetConnectionFactories = new ConcurrentHashMap<>();

@PostConstruct
private void init() {
    setLenientFallback(false);
    setTargetConnectionFactories(new HashMap<>());
    setDefaultTargetConnectionFactory(connectionFactoryBuilder.buildConnectionFactory());
}

@Override
protected Mono<Object> determineCurrentLookupKey() {
    return ReactiveSecurityContextHolder.getContext()
            .map(this::getTenantFromContext)
            .flatMap(tenant -> databaseMigrationService.migrateTenantIfNeeded(tenant)
                    .thenReturn(tenant));
}

private String getTenantFromContext(SecurityContext securityContext) {
    String tenant = null;
    Object principal = securityContext.getAuthentication().getPrincipal();
    if (principal instanceof UserProfile) {
        UserProfile userProfile = (UserProfile) principal;
        tenant = userProfile.getTenant();
    }
    ...
    log.debug("Tenant resolved: " + tenant);
    return tenant;
}

@Override
protected Mono<ConnectionFactory> determineTargetConnectionFactory() {
    return determineCurrentLookupKey().map(k -> {
        String key = (String) k;
        if (!targetConnectionFactories.containsKey(key)) {
            targetConnectionFactories.put(key, connectionFactoryBuilder.buildConnectionFactory(key));
        }
        return targetConnectionFactories.get(key);
    });
}

请注意,我们在DatabaseMigrationService中使用Flyway来为我们获取的每个租户创建和迁移模式。

1
我也遇到了这个问题。
目前,我正在进行以下操作:
  • Publish the PostgresqlConnectionConfigurationBuilder and the PostgresqlConnectionFactory as a Bean:

    @Bean
    public PostgresqlConnectionConfiguration.Builder postgresqlConnectionConfiguration() {
        return PostgresqlConnectionConfiguration.builder()
                .host("localhost")
                .port(5432)
                .applicationName("team-toplist-service")
                .database("db")
                .username("user")
                .password("password");
    }
    
    @Bean
    @Override
    public PostgresqlConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(postgresqlConnectionConfiguration()
                .build());
    }
    
为了在我的业务方法中创建一个新的PostgresqlConnectionFactory,我需要使用注入的PostgresqlConnectionConfigurationBuilder实例——但现在也要调用构建器上的"schema"设置器(从传递下来的org.springframework.web.reactive.function.server.ServerRequest中提取租户信息后)。
我的数据库模式遵循appname_tenantId的模式,因此我们有一个静态配置的"appName",例如"app_name",因此我最终会得到像"app_name_foo_bar123"这样的模式名称。
接下来,我们有一个租户标识符,它在我的情况下将来自于请求头,该请求头由坐落在上游的Apache服务器设置(传递X-Tenant-Id头以便不依赖于URL进行特定于租户的路由)。
所以我的“逻辑”目前看起来有点像这样:
public Flux<TopTeam> getTopTeams(ServerRequest request) {

    List<String> tenantHeader = request.headers().header("X-Tenant-Id");
    // resolve relevant schema name on the fly
    String schema = (appName+ "_" + tenantHeader.iterator().next()).replace("-", "_");
    System.out.println("Using schema: " + schema);
    // configure connfactory with schema set on the builder
    PostgresqlConnectionFactory cf = new PostgresqlConnectionFactory(postgresqlConnectionConfiguration.schema(schema).build());
    // init new DatabaseClient with tenant specific connection
    DatabaseClient cli = DatabaseClient.create(cf);


        return cli
                .execute("select * from top_teams ").fetch().all()
                .flatMap(map -> {

                    ...
                    });
                });
    }

当然,这种逻辑可以被抽象出来,但我不确定应该放在哪里,也许可以将其移动到MethodArgumentResolver中,这样我们就可以注入一个已经配置好的DatabaseClient。


PS:这仅解决了使用DatabaseClient时的多租户问题。我不确定如何使其与R2dbcRepositories兼容。


至少对于ddl来说,这非常有用,因为Spring Data R2dbc不执行DDL操作,例如Spring Data JPA。换句话说,我们可以使用此策略进行DDL,并使用R2dbcRepositories进行DML。太棒了! - GtdDev

0

感谢Charlie Carver的回答,以下是我解决这个问题的方法:

控制器:

    @PostMapping(MAP + PATH_DDL_PROC_DB)  //PATH_DDL_PROC_DB = "/database/{db}/{schema}/{table}"
    public Flux<Object> createDbByDb(
            @PathVariable("db") String db,
            @PathVariable("schema") String schema,
            @PathVariable("table") String table) {
        return ddlProcService.createDbByDb(db,schema,table);

服务:

    public Flux<Object> createDbByDb(String db,String schema,String table) {
        return ddl.createDbByDb(db,schema,table);
    }

代码库:

    @Autowired
    PostgresqlConnectionConfiguration.Builder connConfig;

    public Flux<Object> createDbByDb(String db,String schema,String table) {
        return createDb(db).thenMany(
                Mono.from(connFactory(connConfig.database(db)).create())
                    .flatMapMany(
                            connection ->
                                    Flux.from(connection
                                                      .createBatch()
                                                      .add(sqlCreateSchema(db))
                                                      .add(sqlCreateTable(db,table))
                                                      .add(sqlPopulateTable(db,table))
                                                      .execute()
                                             )));
    }

    private Mono<Void> createDb(String db) {

        PostgresqlConnectionFactory
                connectionFactory = connFactory(connConfig);

        DatabaseClient ddl = DatabaseClient.create(connectionFactory);

        return ddl
                .execute(sqlCreateDb(db))
                .then();
    }

连接类:

@Slf4j
@Configuration
@EnableR2dbcRepositories
public class Connection extends AbstractR2dbcConfiguration {

    /*
     **********************************************
     * Spring Data jdbc:
     *      DDL: does support JPA.
     *
     * Spring Data R2DBC
     *      DDL:
     *          -does no support JPA
     *          -To achieve DDL, uses R2dbc.DataBaseClient
     *
     *      DML:
     *          -it uses R2dbcREpositories
     *          -R2dbcRepositories is different than
     *          R2dbc.DataBaseClient
     * ********************************************
     */
    @Bean
    public PostgresqlConnectionConfiguration.Builder connectionConfig() {
        return PostgresqlConnectionConfiguration
                .builder()
                .host("db-r2dbc")
                .port(5432)
                .username("root")
                .password("root");
    }

    @Bean
    public PostgresqlConnectionFactory connectionFactory() {
        return
                new PostgresqlConnectionFactory(
                        connectionConfig().build()
                );
    }
}

DDL脚本:
@Getter
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class DDLScripts {

    public static final String SQL_GET_TASK = "select * from tasks";

    public static String sqlCreateDb(String db) {
        String sql = "create database %1$s;";
        String[] sql1OrderedParams = quotify(new String[]{db});
        String finalSql = format(sql,(Object[]) sql1OrderedParams);
        return finalSql;
    }

    public static String sqlCreateSchema(String schema) {
        String sql = "create schema if not exists %1$s;";
        String[] sql1OrderedParams = quotify(new String[]{schema});
        return format(sql,(Object[])  sql1OrderedParams);
    }

    public static String sqlCreateTable(String schema,String table) {

        String sql1 = "create table %1$s.%2$s " +
                "(id serial not null constraint tasks_pk primary key, " +
                "lastname varchar not null); ";
        String[] sql1OrderedParams = quotify(new String[]{schema,table});
        String sql1Final = format(sql1,(Object[])  sql1OrderedParams);

        String sql2 = "alter table %1$s.%2$s owner to root; ";
        String[] sql2OrderedParams = quotify(new String[]{schema,table});
        String sql2Final = format(sql2,(Object[])  sql2OrderedParams);

        return sql1Final + sql2Final;
    }

    public static String sqlPopulateTable(String schema,String table) {

        String sql = "insert into %1$s.%2$s values (1, 'schema-table-%3$s');";
        String[] sql1OrderedParams = quotify(new String[]{schema,table,schema});
        return format(sql,(Object[]) sql1OrderedParams);
    }

    private static String[] quotify(String[] stringArray) {

        String[] returnArray = new String[stringArray.length];

        for (int i = 0; i < stringArray.length; i++) {
            returnArray[i] = "\"" + stringArray[i] + "\"";
        }
        return returnArray;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接