在RabbitMQ中配置消费者取消功能

3
我们正在使用一个2节点主动-主动的RabbitMQ集群,并使用镜像队列。其中镜像策略为:

"policies":[{"vhost":"/","name":"ha-all","pattern":"","apply->to":"all","definition":{"ha-mode":"all","ha-sync-mode":"automatic"},"priority":0}]

版本信息:RabbitMQ 3.5.4,Erlang 17.4,spring-amqp/spring-rabbit:1.4.5.RELEASE。
现在,我们正在尝试实现消费者取消,如高可用队列中所述。
然而,由于我们没有使用通道,因此无法使用上述链接中给出的{{basicConsumer}}方法。
我该如何在配置中将"x-cancel-on-ha-failover"设置为true?
beans xml 如下:
 <rabbit:connection-factory id="connectionFactory"  
  addresses="localhost:5672"  
  username="guest"  
  password="guest"
  channel-cache-size="5" />


<!-- CREATE THE JsonMessageConverter BEAN -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" />


 <!-- Spring AMQP Template -->  
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" message-converter="jsonMessageConverter" />

 <!-- in case connection is broken then Retry based on the below policy -->  
 <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">  
<property name="backOffPolicy">  
  <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">  
  <property name="initialInterval" value="500" />  
  <property name="multiplier" value="2" />  
  <property name="maxInterval" value="30000" />  
 </bean>  
</property>  
</bean>  

<rabbit:queue name="testQueue" durable="true">
    <rabbit:queue-arguments>
       <entry key="x-max-priority">
           <value type="java.lang.Integer">10</value> 
       </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

<bean id="messsageConsumer"  class="consumer.RabbitConsumer">
</bean>
<rabbit:listener-container
  connection-factory="connectionFactory" concurrency="5" max-concurrency="5"     message-converter="jsonMessageConverter">
<rabbit:listener queues="testQueue" ref="messsageConsumer" />
</rabbit:listener-container>
1个回答

1
<rabbit:listener-container> 实际上在后台填充了一个 SimpleMessageListenerContainer bean。最后一个支持有关此事的 public void setConsumerArguments(Map<String, Object> args)
因此,为了满足您的要求,您只需要为您的 messsageConsumer 构建原始的 SimpleMessageListenerContainer <bean>
与此同时,当您为应用程序修复它时,我会要求您提供有关添加 <consumer-arguments> 组件的 JIRA。我们可以在当前 GA 截止日期前解决这个问题。

谢谢Artem。SimpleMessageListenerContainer bean 似乎有效。然而,取消消费者并没有取消/停止正在执行消息的线程。该消息已被重新排队并处理了两次。 - Sahil J

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接