使用Spring和Hibernate将读写事务路由到主数据库,将只读事务路由到副本数据库

11
我有一个使用Hibernate/JPA、Spring和Jersey的应用程序。在我的应用程序上下文中,我设置了数据源,定义了一个实体管理器工厂,将事务管理器与该实体管理器工厂进行了设置,并对各种服务方法进行了标记事务注释,因此我还有tx:annotation-driven定义来连接所需的事务管理器。这个设置非常好用,我已经能够很好地读取和写入数据。我想要将数据库设置为一个主库和多个从库(MySQL)。因此,我希望所有标记了事务注释的方法都使用指向主数据库服务器的数据源,而其他方法则使用从库的连接池。
我尝试创建了两个不同的数据源,两个不同的实体管理器工厂和两个不同的持久化单元——这显然是一种丑陋的方式。我尝试过MySQL代理,但我们遇到了更多问题。连接池已经在Servlet容器中处理了。我是否可以在Tomcat中实现某些东西来读取事务并将其指向正确的数据库服务器,或者是否有一种方法可以让所有标记了事务注释的方法都使用特定的数据源?
4个回答

12

Spring事务路由

为了将读写事务路由到主节点并将只读事务路由到副本节点,我们可以定义一个连接到主节点的ReadWriteDataSource和一个连接到副本节点的ReadOnlyDataSource

读写和只读事务路由是通过Spring AbstractRoutingDataSource抽象实现的,该抽象由TransactionRoutingDatasource实现,如下图所示:

Read-write and read-only transaction routing with Spring

TransactionRoutingDataSource非常易于实现,其示例如下:

public class TransactionRoutingDataSource 
        extends AbstractRoutingDataSource {

    @Nullable
    @Override
    protected Object determineCurrentLookupKey() {
        return TransactionSynchronizationManager
            .isCurrentTransactionReadOnly() ?
            DataSourceType.READ_ONLY :
            DataSourceType.READ_WRITE;
    }
}

基本上,我们检查Spring TransactionSynchronizationManager类,该类存储当前事务上下文,以检查当前正在运行的Spring事务是否为只读。

determineCurrentLookupKey方法返回将用于选择读写或只读JDBC DataSource的鉴别器值。

DataSourceType只是一个基本的Java枚举,定义了我们的事务路由选项:

public enum  DataSourceType {
    READ_WRITE,
    READ_ONLY
}

Spring读写和只读JDBC DataSource配置

DataSource的配置如下:

@Configuration
@ComponentScan(
    basePackages = "com.vladmihalcea.book.hpjp.util.spring.routing"
)
@PropertySource(
    "/META-INF/jdbc-postgresql-replication.properties"
)
public class TransactionRoutingConfiguration 
        extends AbstractJPAConfiguration {

    @Value("${jdbc.url.primary}")
    private String primaryUrl;

    @Value("${jdbc.url.replica}")
    private String replicaUrl;

    @Value("${jdbc.username}")
    private String username;

    @Value("${jdbc.password}")
    private String password;

    @Bean
    public DataSource readWriteDataSource() {
        PGSimpleDataSource dataSource = new PGSimpleDataSource();
        dataSource.setURL(primaryUrl);
        dataSource.setUser(username);
        dataSource.setPassword(password);
        return connectionPoolDataSource(dataSource);
    }

    @Bean
    public DataSource readOnlyDataSource() {
        PGSimpleDataSource dataSource = new PGSimpleDataSource();
        dataSource.setURL(replicaUrl);
        dataSource.setUser(username);
        dataSource.setPassword(password);
        return connectionPoolDataSource(dataSource);
    }

    @Bean
    public TransactionRoutingDataSource actualDataSource() {
        TransactionRoutingDataSource routingDataSource = 
            new TransactionRoutingDataSource();

        Map<Object, Object> dataSourceMap = new HashMap<>();
        dataSourceMap.put(
            DataSourceType.READ_WRITE, 
            readWriteDataSource()
        );
        dataSourceMap.put(
            DataSourceType.READ_ONLY, 
            readOnlyDataSource()
        );

        routingDataSource.setTargetDataSources(dataSourceMap);
        return routingDataSource;
    }

    @Override
    protected Properties additionalProperties() {
        Properties properties = super.additionalProperties();
        properties.setProperty(
            "hibernate.connection.provider_disables_autocommit",
            Boolean.TRUE.toString()
        );
        return properties;
    }

    @Override
    protected String[] packagesToScan() {
        return new String[]{
            "com.vladmihalcea.book.hpjp.hibernate.transaction.forum"
        };
    }

    @Override
    protected String databaseType() {
        return Database.POSTGRESQL.name().toLowerCase();
    }

    protected HikariConfig hikariConfig(
            DataSource dataSource) {
        HikariConfig hikariConfig = new HikariConfig();
        int cpuCores = Runtime.getRuntime().availableProcessors();
        hikariConfig.setMaximumPoolSize(cpuCores * 4);
        hikariConfig.setDataSource(dataSource);

        hikariConfig.setAutoCommit(false);
        return hikariConfig;
    }

    protected HikariDataSource connectionPoolDataSource(
            DataSource dataSource) {
        return new HikariDataSource(hikariConfig(dataSource));
    }
}

/META-INF/jdbc-postgresql-replication.properties 资源文件提供了读写和只读 JDBC DataSource 组件的配置:

hibernate.dialect=org.hibernate.dialect.PostgreSQL10Dialect

jdbc.url.primary=jdbc:postgresql://localhost:5432/high_performance_java_persistence
jdbc.url.replica=jdbc:postgresql://localhost:5432/high_performance_java_persistence_replica

jdbc.username=postgres
jdbc.password=admin

jdbc.url.primary属性定义了主节点的URL,而jdbc.url.replica则定义了副本节点的URL。

readWriteDataSource Spring组件定义了读写JDBC DataSource,而readOnlyDataSource组件则定义了只读JDBC DataSource

请注意,读写和只读数据源都使用HikariCP进行连接池管理。有关使用数据库连接池的好处的更多详细信息,请参见相关文档。

actualDataSource作为读写和只读数据源的外观,使用TransactionRoutingDataSource实用程序实现。

readWriteDataSource使用DataSourceType.READ_WRITE键进行注册,而readOnlyDataSource则使用DataSourceType.READ_ONLY键进行注册。

因此,在执行读写@Transactional方法时,将使用readWriteDataSource,而在执行@Transactional(readOnly = true)方法时,则将使用readOnlyDataSource

请注意,additionalProperties方法定义了hibernate.connection.provider_disables_autocommit Hibernate属性,我将其添加到Hibernate中,以延迟RESOURCE_LOCAL JPA事务的数据库获取。 hibernate.connection.provider_disables_autocommit不仅可以让您更好地使用数据库连接,而且它是我们使此示例工作的唯一方式,因为如果没有此配置,则会在调用determineCurrentLookupKey方法TransactionRoutingDataSource之前获取连接。
构建JPA EntityManagerFactory所需的其余Spring组件由AbstractJPAConfiguration基类定义。
基本上,actualDataSource被DataSource-Proxy进一步包装并提供给JPA的ENtityManagerFactory。您可以查看GitHub上的源代码以获取更多详细信息。

测试时间

为了检查事务路由是否起作用,我们将通过在postgresql.conf配置文件中设置以下属性来启用PostgreSQL查询日志:
log_min_duration_statement = 0
log_line_prefix = '[%d] '

log_min_duration_statement属性设置用于记录所有PostgreSQL语句,而第二个属性将数据库名称添加到SQL日志中。

因此,在调用newPostfindAllPostsByTitle方法时,应像这样:

Post post = forumService.newPost(
    "High-Performance Java Persistence",
    "JDBC", "JPA", "Hibernate"
);

List<Post> posts = forumService.findAllPostsByTitle(
    "High-Performance Java Persistence"
);

我们可以看到,PostgreSQL 记录了以下消息:
[high_performance_java_persistence] LOG:  execute <unnamed>: 
    BEGIN

[high_performance_java_persistence] DETAIL:  
    parameters: $1 = 'JDBC', $2 = 'JPA', $3 = 'Hibernate'
[high_performance_java_persistence] LOG:  execute <unnamed>: 
    select tag0_.id as id1_4_, tag0_.name as name2_4_ 
    from tag tag0_ where tag0_.name in ($1 , $2 , $3)

[high_performance_java_persistence] LOG:  execute <unnamed>: 
    select nextval ('hibernate_sequence')

[high_performance_java_persistence] DETAIL:  
    parameters: $1 = 'High-Performance Java Persistence', $2 = '4'
[high_performance_java_persistence] LOG:  execute <unnamed>: 
    insert into post (title, id) values ($1, $2)

[high_performance_java_persistence] DETAIL:  
    parameters: $1 = '4', $2 = '1'
[high_performance_java_persistence] LOG:  execute <unnamed>: 
    insert into post_tag (post_id, tag_id) values ($1, $2)

[high_performance_java_persistence] DETAIL:  
    parameters: $1 = '4', $2 = '2'
[high_performance_java_persistence] LOG:  execute <unnamed>: 
    insert into post_tag (post_id, tag_id) values ($1, $2)

[high_performance_java_persistence] DETAIL:  
    parameters: $1 = '4', $2 = '3'
[high_performance_java_persistence] LOG:  execute <unnamed>: 
    insert into post_tag (post_id, tag_id) values ($1, $2)

[high_performance_java_persistence] LOG:  execute S_3: 
    COMMIT
    
[high_performance_java_persistence_replica] LOG:  execute <unnamed>: 
    BEGIN
    
[high_performance_java_persistence_replica] DETAIL:  
    parameters: $1 = 'High-Performance Java Persistence'
[high_performance_java_persistence_replica] LOG:  execute <unnamed>: 
    select post0_.id as id1_0_, post0_.title as title2_0_ 
    from post post0_ where post0_.title=$1

[high_performance_java_persistence_replica] LOG:  execute S_1: 
    COMMIT

使用 high_performance_java_persistence 前缀的日志语句在主节点上执行,而使用 high_performance_java_persistence_replica 的日志语句则在副本节点上执行。

所以,一切都运作得很顺利!

所有源代码都可以在我的 High-Performance Java Persistence GitHub 存储库中找到,因此您也可以尝试它。

结论

这种要求非常有用,因为 单主数据库复制 架构不仅提供了容错性和更好的可用性,还允许我们通过添加更多的副本节点来扩展读操作。


除了您的博客之外,我也关注了这个——https://fable.sh/blog/splitting-read-and-write-operations-in-spring-boot/,在简单情况下它能够起作用。我的项目基于spring boot 2,在我的rest控制器中,我的一个方法是先读后写。这里选择的默认数据源是slave,但在同一连接中切换数据失败。许多人在不同的文章中报告了这个问题。在这里贴一些链接——https://dev59.com/h5ffa4cB1Zd3GeqP-Jrm。 - thealchemist
http://fedulov.website/2015/10/14/dynamic-datasource-routing-with-spring/ 这里也有人报告了问题。我想知道创建两个不同的存储库,例如一个包含写方法,另一个包含读方法,分别将它们与主数据源和从数据源分别连接是否是唯一的解决方案? - thealchemist
在将 hibernate.connection.handling_mode 设置为 DELAYED_ACQUISITION_AND_RELEASE_AFTER_TRANSACTION 后才能正常工作,否则 lookupKey 方法会在设置 isCurrentTransactionReadOnly 标志之前被调用。 - ramdane.i
如果您使用RESOURCE_LOCAL,则不需要这样做。相反,您需要使用 hibernate.connection.provider_disables_autocommit - Vlad Mihalcea

8
这是我最终采取的方案,并且效果很好。实体管理器只能有一个bean作为数据源。因此,我需要创建一个bean,以在必要时在两者之间进行路由。那个bean是我用于JPA实体管理器的那个。
我在Tomcat中设置了两个不同的数据源。在server.xml中,我创建了两个资源(数据源)。
<Resource name="readConnection" auth="Container" type="javax.sql.DataSource"
          username="readuser" password="readpass"
          url="jdbc:mysql://readipaddress:3306/readdbname"
          driverClassName="com.mysql.jdbc.Driver"
          initialSize="5" maxWait="5000"
          maxActive="120" maxIdle="5"
          validationQuery="select 1"
          poolPreparedStatements="true"
          removeAbandoned="true" />
<Resource name="writeConnection" auth="Container" type="javax.sql.DataSource"
          username="writeuser" password="writepass"
          url="jdbc:mysql://writeipaddress:3306/writedbname"
          driverClassName="com.mysql.jdbc.Driver"
          initialSize="5" maxWait="5000"
          maxActive="120" maxIdle="5"
          validationQuery="select 1"
          poolPreparedStatements="true"
          removeAbandoned="true" />

您可以将数据库表放在同一台服务器上,此时IP地址或域名相同,只是不同的数据库 - 您已经了解了要点。

然后,我在Tomcat的context.xml文件中添加了一个资源链接,引用了这些资源。

<ResourceLink name="readConnection" global="readConnection" type="javax.sql.DataSource"/>
<ResourceLink name="writeConnection" global="writeConnection" type="javax.sql.DataSource"/>

这些资源链接是Spring在应用程序上下文中读取的。
在应用程序上下文中,我为每个资源链接添加了一个bean定义,并添加了一个额外的bean定义,引用了我创建的一个接受两个先前创建的bean(bean定义)的映射(枚举)的数据源路由器bean。
<!--
Data sources representing master (write) and slaves (read).
-->
<bean id="readDataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="readConnection" /> 
    <property name="resourceRef" value="true" />
    <property name="lookupOnStartup" value="true" />
    <property name="cache" value="true" />
    <property name="proxyInterface" value="javax.sql.DataSource" />  
</bean>

<bean id="writeDataSource" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiName" value="writeConnection" />
    <property name="resourceRef" value="true" />
    <property name="lookupOnStartup" value="true" />
    <property name="cache" value="true" />
    <property name="proxyInterface" value="javax.sql.DataSource" />
</bean>

<!--
Provider of available (master and slave) data sources.
-->
<bean id="dataSource" class="com.myapp.dao.DatasourceRouter">
    <property name="targetDataSources">
      <map key-type="com.myapp.api.util.AvailableDataSources">
         <entry key="READ" value-ref="readDataSource"/>
         <entry key="WRITE" value-ref="writeDataSource"/>
      </map>
   </property>
   <property name="defaultTargetDataSource" ref="writeDataSource"/>
</bean>

实体管理器bean定义然后引用了dataSource bean。
<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
    <property name="persistenceUnitName" value="${jpa.persistenceUnitName}" />
    <property name="jpaVendorAdapter"> 
        <bean class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter"> 
            <property name="databasePlatform" value="${jpa.dialect}"/>
            <property name="showSql" value="${jpa.showSQL}" />
        </bean>
    </property>
</bean>

我在一个属性文件中定义了一些属性,但你可以用自己特定的值替换${}的值。现在我有一个bean,它使用另外两个表示我的两个数据源的bean。其中一个bean是我用于JPA的bean。它不知道任何路由发生的情况。
现在是路由bean。
public class DatasourceRouter extends AbstractRoutingDataSource{

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException{
    // TODO Auto-generated method stub
    return null;
    }

    @Override
    protected Object determineCurrentLookupKey(){
    return DatasourceProvider.getDatasource();
    }

}

覆盖的方法被实体管理器调用以基本确定数据源。DatasourceProvider具有一个线程本地(线程安全)属性,具有getter和setter方法,以及清除数据源方法进行清理。

public class DatasourceProvider{
    private static final ThreadLocal<AvailableDataSources> datasourceHolder = new ThreadLocal<AvailableDataSources>();

    public static void setDatasource(final AvailableDataSources customerType){
    datasourceHolder.set(customerType);
    }

    public static AvailableDataSources getDatasource(){
    return (AvailableDataSources) datasourceHolder.get();
    }

    public static void clearDatasource(){
    datasourceHolder.remove();
    }

}

我有一个通用的DAO实现,其中包含我用来处理各种常规JPA调用(getReference、persist、createNamedQUery和getResultList等)的方法。在它调用entityManager执行所需操作之前,我将DatasourceProvider的数据源设置为读或写。该方法也可以处理传入该值以使其更加动态。以下是示例方法。

@Override
public List<T> findByNamedQuery(final String queryName, final Map<String, Object> properties, final int... rowStartIdxAndCount)
{
DatasourceProvider.setDatasource(AvailableDataSources.READ);
final TypedQuery<T> query = entityManager.createNamedQuery(queryName, persistentClass);
if (!properties.isEmpty())
{
    bindNamedQueryParameters(query, properties);
}
appyRowLimits(query, rowStartIdxAndCount);

return query.getResultList();
}

AvailableDataSources是一个枚举类型,有READ和WRITE两个选项,用于引用相应的数据源。您可以在应用程序上下文中定义的bean中的映射中查看该枚举类型。


哦,还要确保MySQL JAR在Tomcat中,否则数据源(资源)就无法工作。 - Elrond
1
以下是使用自定义注释增强此方法的一些改进:http://fedulov.website/2015/10/14/dynamic-datasource-routing-with-spring/ - Alex Fedulov

1
我有同样的需求:使用经典的主/从架构来扩展读取操作,将只读数据库和写入数据库之间的连接路由起来。
最终,我找到了一种简洁的解决方案,使用了Spring框架中的AbstractRoutingDataSource基类。它允许你注入一个数据源,根据你编写的条件将请求路由到多个数据源。
<bean id="commentsDataSource" class="com.nextep.proto.spring.ReadWriteDataSourceRouter">
    <property name="targetDataSources">
        <map key-type="java.lang.String">
            <entry key="READ" value="java:comp/env/jdbc/readdb"/>
            <entry key="WRITE" value="java:comp/env/jdbc/writedb"/>
        </map>
    </property>
    <property name="defaultTargetDataSource" value="java:comp/env/jdbc/readdb"/>
</bean>

我的路由器看起来就像下面这样:

public class ReadWriteDataSourceRouter extends AbstractRoutingDataSource {

@Override
protected Object determineCurrentLookupKey() {
    return TransactionSynchronizationManager.isCurrentTransactionReadOnly() ? "READ"
            : "WRITE";
}
}

我觉得这个方法很优雅,但问题在于Spring似乎在注入数据源后将事务设置为只读,所以它不起作用。我的简单测试是在只读方法中检查TransactionSynchronizationManager.isCurrentTransactionReadOnly()的结果(它为true),以及在determineCurrentLookupKey()方法中进行相同调用时为false。

如果你有什么想法...无论如何,你都可以基于除TransactionSynchronizationManager之外的任何其他东西来进行测试,这样就可以正常工作。

希望这能帮到你, Christophe


你把它搞定了吗?我也遇到了同样的问题。 - Papick G. Taboada
使用LazyConnectionDataSourceProxy创建您的@primary数据源。这将仅在运行第一个数据库语句时获取连接对象。 - machaxX
请查看 https://github.com/kwon37xi/replication-datasource。 - machaxX

0
<bean id="entityManagerFactory" 
    class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="persistenceUnitName" value="filerp-pcflows" />
    <property name="dataSource" ref="pooledDS" />
    <property name="persistenceXmlLocation" value="classpath:powercenterCPCPersistence.xml" />
    <property name="jpaVendorAdapter">
        <bean class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter">
            <property name="showSql" value="true" />
            <!--<property name="formatSql" value="true" />
            --><property name="generateDdl" value="false" />
            <property name="database" value="DB2" />
        </bean>
    </property>
</bean>

-->

(注:此为HTML代码,无需翻译)

<bean id="pool" autowire-candidate="false" class="org.apache.commons.pool.impl.GenericObjectPool" destroy-method="close">
    <property name="minEvictableIdleTimeMillis" value="300000"/>
    <property name="timeBetweenEvictionRunsMillis" value="60000"/>
    <property name="maxIdle" value="2"/>
    <property name="minIdle" value="0"/>
    <property name="maxActive" value="8"/>
    <property name="testOnBorrow" value="true"/>
</bean>

<bean id="dsConnectionFactory" class="org.apache.commons.dbcp.DataSourceConnectionFactory">
    <constructor-arg><ref bean="dataSource" /></constructor-arg>
</bean> 
<bean id="poolableConnectionFactory" class="org.apache.commons.dbcp.PoolableConnectionFactory">
    <constructor-arg index="0"><ref bean="dsConnectionFactory" /></constructor-arg>
    <constructor-arg index="1"><ref bean="pool" /></constructor-arg>
    <constructor-arg index="2"><null /></constructor-arg>
    <constructor-arg index="3"><value>select 1 from ${cnx.db2.database.creator}.TPROFILE</value></constructor-arg>
    <constructor-arg index="4"><value>false</value></constructor-arg>
    <constructor-arg index="5"><value>true</value></constructor-arg>
</bean>

<bean id="pooledDS" class="org.apache.commons.dbcp.PoolingDataSource"
    depends-on="poolableConnectionFactory">
    <constructor-arg>
        <ref bean="pool" />
    </constructor-arg>
</bean> 
<import resource="powercenterCPCBeans.xml"/>


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接