Spring RedisConnectionFactory在使用事务时没有将连接返回到池中,当连接耗尽时会阻塞。

16

我的配置用于创建带有连接池的连接工厂。我确实有一个连接池。这个代码的大部分来自于Spring的RedisAutoConfiguration,但我已经因为特殊原因禁用了它。

@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class JedisConfiguration implements RedisConfiguration {

    @Bean
    @Scope("prototype")
    @Override
    public RedisConnectionFactory connectionFactory(RedisProperties redisProperties) {
        return createFactory(redisProperties);
    }

    private static JedisConnectionFactory applyProperties(RedisProperties properties, JedisConnectionFactory factory) {
        factory.setHostName(properties.getHost());
        factory.setPort(properties.getPort());
        factory.setDatabase(properties.getDatabase());
        return factory;
    }

    private static JedisPoolConfig jedisPoolConfig(RedisProperties properties) {
        return Optional.ofNullable(properties.getPool())
                       .map(props -> {
                           JedisPoolConfig config = new JedisPoolConfig();
                           config.setMaxTotal(props.getMaxActive());
                           config.setMaxIdle(props.getMaxIdle());
                           config.setMinIdle(props.getMinIdle());
                           config.setMaxWaitMillis(props.getMaxWait());
                           return config;
                       })
                       .orElseGet(JedisPoolConfig::new);
    }

    public static JedisConnectionFactory createFactory(RedisProperties properties) {
        return applyProperties(properties, new JedisConnectionFactory(jedisPoolConfig(properties)));
    }
}

使用案例

我有字符串键"A""B""C",分别映射到具有字符串哈希键和哈希值的哈希映射中。这些哈希值是从类ABC序列化成JSON得到的。

也就是说,"A" -> A::toString -> json(A)BC同理。

@Component
public final class UseCase implements InitializingBean {

    private static final String A_KEY = "A";
    private static final String B_KEY = "B";
    private static final String C_KEY = "C";

    private final RedisConnectionFactory factory;
    private final ObjectMapper objectMapper;
    private HashOperations<String, String, A> aMap;
    private HashOperations<String, String, B> bMap;
    private HashOperations<String, String, C> cMap;
    private RedisTemplate<String, ?> template;

    private UseCase(RedisConnectionFactory factory, ObjectMapper objectMapper) {
        this.factory = factory;
        this.objectMapper = objectMapper;
    }

    private <T> RedisTemplate<String, ?> hashMap(Class<T> vClass) {
        RedisTemplate<String, ?> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(stringSerializer());
        redisTemplate.setHashKeySerializer(stringSerializer());
        redisTemplate.setHashValueSerializer(jacksonSerializer(vClass));
        return configure(redisTemplate);
    }


    private <K, V> RedisTemplate<K, V> configure(RedisTemplate<K, V> redisTemplate) {
        redisTemplate.setConnectionFactory(factory);
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    private <T> RedisSerializer<T> jacksonSerializer(Class<T> clazz) {
        Jackson2JsonRedisSerializer<T> serializer = new Jackson2JsonRedisSerializer<>(clazz);
        serializer.setObjectMapper(objectMapper);
        return serializer;
    }

    private RedisSerializer<String> stringSerializer() {
        return new StringRedisSerializer();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        template = hashMap(String.class);
        aMap = hashMap(A.class).opsForHash();
        bMap = hashMap(B.class).opsForHash();
        cMap = hashMap(C.class).opsForHash();
    }

    void put(A a, B b, C c) {
        template.multi();
        aMap.put(A_KEY, a.toString(), a);
        bMap.put(B_KEY, b.toString(), b);
        cMap.put(C_KEY, c.toString(), c);
        template.exec();
    }

    A getA(String aKey) {
        return aMap.get(A_KEY, aKey);
    }

}

期望

  1. 通过一个连接执行put操作,并在连接丢失或损坏时失败。
  2. 对于put操作,应在multi调用时获取连接,并在exec调用后将其返回到池中。
  3. 对于getA操作,在执行后将连接返回到池中。

我有测试来证明第1点的有效性,但对于最后两个点我还有些怀疑。经过调试,我观察到在这两个操作后连接没有被返回到池中,导致池在用尽时阻塞。

虽然已尝试返回连接,但由于以下两个分支失败,未能成功调用。取自RedisConnectionUtils

// release transactional/read-only and non-transactional/non-bound connections.
// transactional connections for read-only transactions get no synchronizer registered
if (isConnectionTransactional(conn, factory)
        && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
    unbindConnection(factory);
} else if (!isConnectionTransactional(conn, factory)) {
    if (log.isDebugEnabled()) {
        log.debug("Closing Redis Connection");
    }
    conn.close();
}

问题

  1. 我做错了什么?
  2. 为什么连接没有返回到池中?
  3. 我该怎么解决这个问题,以便将连接返回到池中?
1个回答

4
我认为问题在于调用exec()并未告诉模板您已经完成了连接,因此无法将其返回到池中。
根据文档,您应该将代码包装在SessionCallback中,并使用RedisTemplate.execute(SessionCallback<T> callback)执行它,在回调执行后将连接返回到池中。
像这样:
template.execute(new SessionCallback<List<Object>>() {
    public List<Object> execute(RedisOperations operations) throws DataAccessException {
        operations.multi();
        aMap.put(A_KEY, a.toString(), a);
        bMap.put(B_KEY, b.toString(), b);
        cMap.put(C_KEY, c.toString(), c);
        return operations.exec();
    }
});

Spring Data Redis还支持@Transactional,它会自动为您绑定/解绑连接,但需要您在可拦截的bean中实现该方法(即不能是final),并且只有在从bean外部执行时(即不是从同一类或子/父类的另一个方法中执行时)才会启动事务。
您已经通过redisTemplate.setEnableTransactionSupport(true);在模板上启用了事务支持,所以可以开始使用了。
@Transactional
public void put(A a, B b, C c) {
    aMap.put(A_KEY, a.toString(), a);
    bMap.put(B_KEY, b.toString(), b);
    cMap.put(C_KEY, c.toString(), c);
}

感谢您的回答。使用SessionCallback执行似乎会将连接返回到池中。您有什么想法为什么只读操作不会将连接返回到池中吗? - Olayinka
2
据我所知,这是因为返回连接池的方法从未被调用,除非您使用execute - Raniz
非常感谢。我想我可以处理需要在每个调用中包装一个SessionCallback的情况。 - Olayinka
1
很高兴我可以帮到你! - Raniz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接