Apache Camel 创建消费者组件

3

我是Apache Camel的新手。在HP NonStop中,有一个接收器可以接收由事件管理器生成的事件,类似于流。我的目标是设置一个消费者端点,接收传入的消息并通过Camel进行处理。

另一个端点,我只需要将其写入日志中。从我的研究中,我了解到对于消费者端点,我需要创建自己的组件,配置如下:

   from("myComp:receive").to("log:net.javaforge.blog.camel?level=INFO")

这是我的代码片段,用于从事件系统接收消息。

 Receive receive = com.tandem.ext.guardian.Receive.getInstance();
    byte[] maxMsg = new byte[500]; // holds largest possible request 
    short errorReturn = 0;
    do { // read messages from $receive until last close 
        try {
            countRead = receive.read(maxMsg, maxMsg.length);
            String receivedMessage=new String(maxMsg, "UTF-8");
            //Here I need to handover receivedMessage to camel 

        } catch (ReceiveNoOpeners ex) {
            moreOpeners = false;
        } catch(Exception e) {
            moreOpeners = false;
        }
    } while (moreOpeners);

有人能够给一些提示,如何将这个作为消费者?
2个回答

7
这是一个总体概述:
您需要从实现组件开始。最简单的方法是扩展org.apache.camel.impl.DefaultComponent。您唯一需要做的就是覆盖DefaultComponent :: createEndpoint(..)。显然,它所做的就是创建您的端点。
因此,您需要实现自己的端点。为此,请扩展org.apache.camel.impl.DefaultEndpoint。至少覆盖DefaultEndpoint :: createConsumer(Processor)以创建自己的消费者。
最后但并非最不重要的是,您需要实现消费者。同样,最好扩展org.apache.camel.impl.DefaultConsumer。消费者是生成消息的代码所在的地方。通过构造函数,您会收到对端点的引用。使用端点引用创建一个新的Exchange,填充它并将其沿着路线发送。大致如下:
Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
setMyMessageHeaders(ex.getIn(), myMessagemetaData);
setMyMessageBody(ex.getIn(), myMessage);

getAsyncProcessor().process(ex, new AsyncCallback() {
    @Override
    public void done(boolean doneSync) {
        LOG.debug("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
    }
});

我建议你选择一个简单的组件(例如DirectComponent)作为示例进行学习。


谢谢您的回答,我已经创建了MessageComponentMessageEndpointMessageProducerMessageConsumerMessageConsumer继承自DefaultConsumer。我找不到一个方法来处理我的消息。我需要在构造函数中添加它吗? - vels4j
覆盖DefaultConsumerdoStart()doStop()方法以启动/停止您的外部消息订阅/轮询。在我的情况下,我在我的消费者中实现了一个回调方法,每当我接收到外部消息时就会调用它。在其中,我创建头和正文,并将其设置在新的Exchange上,然后将消息沿着路由发送。 - Ralf

0

在此附上我的消费者组件,希望能对某些人有所帮助。

public class MessageConsumer extends DefaultConsumer {

private final MessageEndpoint endpoint;

private boolean moreOpeners = true;

public MessageConsumer(MessageEndpoint endpoint, Processor processor) {
    super(endpoint, processor);
    this.endpoint = endpoint;

}


 @Override
protected void doStart() throws Exception {

    int countRead=0; // number of bytes read 

    do { 
         countRead++;
            String msg = String.valueOf(countRead)+" "+System.currentTimeMillis();
            Exchange ex = endpoint.createExchange(ExchangePattern.InOnly);
            ex.getIn().setBody(msg);
            getAsyncProcessor().process(ex, new AsyncCallback() {
                @Override
                public void done(boolean doneSync) {
                    log.info("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously"));
                }
            });
            // This is an echo server so echo request back to requester    

    } while (moreOpeners);
}

@Override
protected void doStop() throws Exception {
    moreOpeners = false;
    log.debug("Message processor is shutdown");
}

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接