同步异步通信的同步化

4

我有一个REST服务,它接收一些数据,并通过异步的IBM MQ请求来检查这些数据。

REST控制器:

@RestController
@RequestMapping("/request")
public class RequestController {

    @RequestMapping(method = RequestMethod.POST)
    public Response postRequest(@RequestBody Request request) {

        String data = request.getData();

        jmsSender.send(data);

        // Now I need the response from MQ
        // String mqResponse = ...
        if (mqIsValid(mqResponse)) {
            return createValidResponse();
        }
        return createNotValidResponse();
    }
}

MQ发送者:
@Service
public class JmsSender {

    public void send(String data) {
        jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
    }

}

MQ接收器:
@Component
public class JmsReceiver {

    @JmsListener(destination = "QUEUE.FROM.MQ, containerFactory = "DefaultJmsListenerContainerFactory")
    public void receiveMessage(String message) {
        // How to pass the message to the controller?
    }

}

如何等待MQ正确的数据以在控制器中创建正确的响应?

是否可以像这里描述的那样使用BlockingQueue来实现?在我的情况下,我必须区分数据。我不能只从阻塞队列中取第一个数据。

例如,如果有两个REST请求(数据为:abcxyz),如何确保响应正确的答案而不仅仅是从MQ得到的第一个答案?

我也无法更改MQ接口。


1
我不知道你是否可以使用 JMS 实现此目标。几个月前我也需要实现类似的功能,但是我必须使用 IBM MQ 类来实现。如果你可以使用 MQ 类,你需要设置并使用一个唯一的 correlationId 来将响应与请求匹配。 - Kevyn Meganck
谢谢,我会研究一下。实际上数据本身包含一个唯一的ID,我可以用它来识别它。但是我该如何告诉REST控制器等待MQ的响应并将响应传递给控制器呢? - deve
3个回答

1
尝试使用以下类似的 CountDownLatch。
@RestController
@RequestMapping("/request")
public class RequestController {

    @RequestMapping(method = RequestMethod.POST)
    public Response postRequest(@RequestBody Request request) {
        final CountDownLatch jmsLatch = new CountDownLatch (1);

        String data = request.getData();

        jmsSender.send(data, jmsLatch);

        try {
            latch.await();  // wait untill latch counted down to 0
        } catch (InterruptedException e) {
            return createNotValidResponse();
        }

        return createValidResponse();
    }
}

修改发送方法以从控制器获取CountDownLatch。
@Service
public class JmsSender {

    public void send(String data, final CountDownLatch jmsLatch) {
        jmsLatch.await();
        jmsTemplate.convertAndSend("QUEUE.TO.MQ", data);
    }

}

修改接收方法以从控制器获取相同的CountDownLatch。
@Component
public class JmsReceiver {

    @JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
    public void receiveMessage(String message, final CountDownLatch jmsLatch) {
        // Pass the message to the controller
        jmsLatch.countDown();
    }

}

这里的诀窍是你需要将同一个CountDownLatch实例从控制器传递给发送方和接收方类,并在接收到消息后调用countDown方法。

1
你使用 CountDownLatch 的原因是什么?不能使用 Object#waitObject#notify 或像 ReentrantLock 这样的监视器来实现相同的功能吗?如果例如向 MQ 发送了两个请求,而第一个 MQ 响应是针对第二个请求的,我该如何确保获得正确的响应? - deve
当我面对类似的情况时,我更喜欢使用 CounDownLatch。但是如果我们采用等待/通知方法,你会在哪个对象上使用 wait/notify? - Avinash
对于你的第二个问题,你需要向我展示你获取值的代码。 - Avinash

0

使用jms(activemq)实现请求-响应模式的同步异步场景解决方案

这个例子的思路是在两个不同的服务中使用不同的jvm。该解决方案已经与多个实例服务并发测试通过:

  • 服务 1 (M1) - Rest api 同步调用,然后使用 activemq 开始异步流程来调用第二个实现集成模式 请求-响应 的 M2 服务。您不需要停止或等待任何线程,jms 模式实现了 ack Session.AUTO_ACKNOWLEDGE

    @PostMapping
    public AnyDto sendMessage(final AnyDto anyDto) {
        return routeService.send(anyDto);
    }
    public void flowOrchestation (final anyDto data) throws JMSException {
        final ObjectMessage objectMessage = composeTemplateMessage(data);
        final AnyDto responseDto = jmsMessagingTemplate.convertSendAndReceive(new ActiveMQQueue("queue.request"),
                objectMessage, AnyDto.class);
    }
    private ObjectMessage composeTemplateMessage(final AnyDto data) throws JMSException {
    
        jmsTemplate.setReceiveTimeout(10000L);
        jmsMessagingTemplate.setJmsTemplate(jmsTemplate);
    
        Session session = jmsMessagingTemplate.getConnectionFactory().createConnection()
                .createSession(false, Session.AUTO_ACKNOWLEDGE);
    
        final ObjectMessage objectMessage = session.createObjectMessage(data);
    
        objectMessage.setJMSCorrelationID(UUID.randomUUID().toString());
        objectMessage.setJMSReplyTo(new ActiveMQQueue("queue.response"));
        objectMessage.setJMSExpiration(0);
        objectMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
        return objectMessage;
    }
    
超时和过期时间可以根据您的需求进行修改。0 过期时间表示不会过期。
服务2(M2):只接收信息并做出响应,JmsReplyTo 设置在 M1 上。

@Component public class Consumer implements SessionAwareMessageListener<Message> {
@Override @JmsListener(destination = "${queue.request}") public void onMessage(Message message, Session session) throws JMSException { AnyDto anyDto = (AnyDto) ((ActiveMQObjectMessage) message).getObject();
//做一些事情
final ObjectMessage responseMessage = new ActiveMQObjectMessage(); responseMessage.setJMSCorrelationID(message.getJMSCorrelationID()); responseMessage.setObject(dtoModified);
final MessageProducer producer = session.createProducer(message.getJMSReplyTo()); producer.send(responseMessage); } }


0

由于我找不到适合我的解决方案,所以我创建了一个简单的等待机制来获取数据。

MqReceiver:

@Component
public class JmsReceiver {

    private final Lock lock;
    private final Condition containsKey;
    private final Map<String, String> responses;

    public JmsReceiver() {
        this.lock = new ReentrantLock();
        this.containsKey = lock.newCondition();
        this.responses = new HashMap<>();
    }

    @JmsListener(destination = "QUEUE.FROM.MQ", containerFactory = "DefaultJmsListenerContainerFactory")
    public void receiveMessage(String message) {
        put(getKeyFromMessage(message), message);
    }

    public String get(String key) throws InterruptedException {
        lock.lock();
        try {
            while (!responses.containsKey(key)) {
                containsKey.await();
            }
            return responses.get(key);
        } finally {
            lock.unlock();
        }
    }

    public void put(String key, String messagee) {
        lock.lock();
        try {
            responses.put(key, messagee);
            containsKey.signalAll();
        } finally {
            lock.unlock();
        }
    }

}

这可以在控制器中使用:

@RestController
@RequestMapping("/request")
public class RequestController {

    @RequestMapping(method = RequestMethod.POST)
    public Response postRequest(@RequestBody Request request) {

        String data = request.getData();

        jmsSender.send(data);

        String key = getKeyFromData(data);
        // waits until MQ sends the data
        String mqResponse = jmsReceiver.get(key);

        if (mqIsValid(mqResponse)) {
            return createValidResponse();
        }
        return createNotValidResponse();
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接