Apache Camel与IBM MQ结合使用

10

大家好,有人曾经使用过Camel与IBM的MQ吗?我们考虑可能将这两个产品结合使用,但是没有这两个产品共同工作的示例。


1
您需要使用JMS将它们连接在一起。 - Souciance Eqdam Rashti
大家好,如果有人有结合Apache Camel和IBM MQ的演示程序,请发送邮件至sreekanthdev191@gmail.com或在此处评论Github链接,因为我对这两种技术都很新,并且找不到太多资源。提前感谢您的帮助。 - Srikanth Janapati
3个回答

8

我在使用IBM MQ和Camel方面有丰富的经验。两者一起使用没有问题。下面是我从一个Spring上下文文件中粘贴的示例配置,其中利用了Camel Jms端点、Spring连接工厂和IBM MQ定义。

Camel路由

from("someplace")
    .to("cpaibmmq:queue:myQueueName");

Spring 上下文

<bean name="cpaibmmq" class="org.apache.camel.component.jms.JmsComponent" destroy-method="doStop">
    <property name="transacted" value="${jms.transacted}" />
    <property name="concurrentConsumers" value="${cpa.concurrentConsumers}" />
    <property name="maxConcurrentConsumers" value="${cpa.concurrentConsumers}" />
    <property name="acceptMessagesWhileStopping" value="${jms.acceptMessagesWhileStopping}" />
    <property name="acknowledgementModeName" value="${jms.acknowledgementModeName}" />
    <property name="cacheLevelName" value="${jms.cacheLevelName}" />
    <property name="connectionFactory" ref="ibmFac1" />
    <property name="exceptionListener" ref="ibmFac1" />
</bean>

<bean id="ibmFac1" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
    <constructor-arg>
        <bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
            <property name="transportType" value="1" />
            <property name="channel" value="${cpa.wmq.channel}" />
            <property name="hostName" value="${cpa.wmq.hostname}" />
            <property name="port" value="${cpa.wmq.port}" />
            <property name="queueManager" value="${cpa.wmq.mqmanager}" />
        </bean>
    </constructor-arg>
</bean>

似乎不需要指定queueManagerchannel属性。如果队列管理器在默认端口1414上有活动侦听器,那么指定port属性也是可选的,对吗?我认为作为jms应用程序和ibm mq之间的代理,侦听器确定目标队列管理器。此外,基本上在目标队列管理器中没有任何需要channel - faghani

4
我能够提供的最好的内容如下所述,它是一个Spring XML应用程序上下文,它本身托管CAMEL上下文和路由。此示例与IBM本机MQ JCA兼容资源适配器v7.5、CAMEL 2.16和Spring core 4.2一起使用。我已经在Glassfish、Weblogic和JBoss EAP7服务器上部署了它。
复杂性在于处理MQ报告的流程,其哲学与纯JMS回复消息相冲突。有关详细说明,请参阅使用Camel JMS组件实现原生Websphere MQ with CoD 基于CAMEL XML DSL的这个示例是自包含且易于测试的。
我们从Spring和CAMEL声明开始:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util" 
xmlns:context="http://www.springframework.org/schema/context" 
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

CAMEL上下文有两个路由:从MQ到JMS和从JMS到MQ,这里链接在一起形成一个桥梁以便于测试。
<camel:camelContext id="mqBridgeCtxt">
<camel:route id="mq2jms" autoStartup="true">

奇怪的是,在Weblogic上,获取(例如)3个监听器的唯一方法是强制使用3个连接(按顺序使用3个Camel:from语句),每个连接最多1个会话,否则将出现MQ错误:MQJCA1018:每个连接只允许一个会话。在JBoss上,您可以简单地调整concurrentConsumers=...
  <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
        acknowledgementModeName=SESSION_TRANSACTED"/> 

上面的disableReplyTo选项确保CAMEL在测试MQ消息类型是否为1=请求(-回复)或8=数据报(单向!)之前不会产生回复。这个测试和回复构造在这里没有被说明。
然后,我们强制执行EIP对下一次发布到普通JMS的InOnly,以保持与入站MQ模式的一致性。
  <camel:setExchangePattern pattern="InOnly"/>
  <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
  <camel:to uri="ref:innerQueue" />
</camel:route>

这结束了MQ到jms的路由;接下来是在同一个CAMEL上下文中进行的jms到MQ的路由:
<camel:route id="jms2mq"  autoStartup="true">
  <camel:from uri="ref:innerQueue" />
  <!-- remove inner message headers and properties to test without inbound side effects! -->
  <camel:removeHeaders pattern="*"/> 
  <camel:removeProperties pattern="*" />
  <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->

现在需要请求标志以便远程目的地返回MQ CoD报告。我们还强制MQ消息为数据报类型(值为8)。
  <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
  <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
  <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>

可以通过ReplyTo uri选项或以下方式的标头来指定ReplyTo队列。

接下来,我们使用CamelJmsDestinationName标头来强制禁止JMS MQ消息头MQRFH2(使用targetClient MQ URL选项值1)。换句话说,我们想发送一个纯粹的MQ二进制消息(即仅包含MQMD消息描述符和有效负载)。

  <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
  <camel:setHeader headerName="CamelJmsDestinationName"> <camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>

更多的MQMD字段可以通过保留的JMS属性进行控制,如下所示。请查看IBM文档中的限制。
  <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
  <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>

URI中的目标队列被CamelJmsDestinationName覆盖,因此URI中的队列名称成为占位符。
URI选项preserveMessageQos是允许发送设置了ReplyTo数据的消息(以获取MQ CoD报告),但通过强制执行InOnly MEP来防止CAMEL实例化Reply消息侦听器的选项。
  <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
            exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
            includeSentJMSMessageID=true" />
</camel:route>
</camel:camelContext>

我们还没有完成,我们仍然需要声明本机JMS提供程序和Websphere MQ(通过本机IBM WMQ JCA资源适配器)的队列工厂,并根据您的上下文进行调整。我们在此处使用JNDI查找管理对象。
<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>

除了从JNDI获取工厂(和JCA适配器)的方法外,另一种方法是将JMS客户端声明为Spring beans。在Weblogic和Glassfish中,最好部署本地IBM JCA资源适配器并创建JNDI资源,然后像上面那样在Spring上下文中引用,在JBoss中直接使用MQ客户端bean声明最合适,如下所示。

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

欢迎评论和改进。


1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接