使用Java异步处理Amazon SQS队列中的消息

6

我很难弄清楚如何处理来自亚马逊 SQS 的消息。

我正在尝试实现以下内容:

  1. SQS 上的侦听器
  2. 从队列中处理消息并将其添加到数据库
  3. 从队列中删除已处理的消息

让我苦恼的是如何实施第2步。 我有一个名为 SQSConnector ProfileDao 的类。 现在,我想要简单的实现方式:通过在 ProfileDao 中初始化 SQSConnector 并从队列中接收消息。 我的想法是启动新线程,开始轮询消息,并在队列为空时从 ProfileDao 中停止该线程。

返回/处理消息的最佳方法是什么(回调函数?),如果有其他方法可以做到这一点,我也愿意考虑。

谢谢

1个回答

3

使用Java的ExecutorService, FutureConcurrentLinkedQueue,我已经完成了类似于SQS的任务。

ExecutorService创建一个线程池,可以执行实现Callable接口并返回Future的类。当ExecutorService创建这些futures时,我将它们推入在线程中运行并处理结果的ConcurrentLinkedQueue中。

实现检查SQS并异步启动工作:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class SqsProcessor {

    private static final int THREAD_COUNT = 100;
    private ExecutorService _executor = null;
    private FutureResultProcessor futureResultProcessor = null;
    
    public SqsProcessor() {
        _executor = Executors.newFixedThreadPool(THREAD_COUNT);
        _futureResultProcessor = new FutureResultProcessor();
    }

    public void waitReceive() {

        // Receive a SQS message

        // Start the work related to the SQS message
        Callable<MyWorkerResult> sqsWorker = new MyWorker(sqsMessage);
        Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);

        // Send to the queue so the result can be processed when it completes
        _futureResultProcessor.add(sqsFuture);
    }
}

执行工作的类:

import java.util.concurrent.Callable;

public class MyWorker implements Callable<MyWorkerResult> {

    private String _sqsMessage = null;

    public MyWorker(String sqsMessage) {
        _sqsMessage = sqsMessage;
    }

    @Override
    public MyWorkerResult call() throws Exception {
        // Do work relating to the SQS message
    }
}

保存工作结果:
public class MyWorkerResult {
    // Results set in MyWorker call()
}

使用ConcurrentLinkedQueue来接收和处理未来的结果:

import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;

public class FutureResultProcessor extends Thread {

    private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
    private final Integer CHECK_SLEEP = 300;

    public FutureResultProcessor() {
    }

    public void run() {
        while(true) {
            Future<MyWorkerResult> myFuture = resultQueue.poll();

            if(myFuture == null) {
                // There's nothing to process
                try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
                continue;
            }

            // Process result
            if(myFuture != null) {

                MyFutureResult myFutureResult = myFuture.get();

                // Process result
            }
        }
    }

    public void add(Future<MyWorkerResult> sqsFuture) {
        resultQueue.offer(sqsFuture);
    }
}

或者你可以收集一组未来并等待它们全部完成后再处理结果。

Akka可能是一个很好的选择。我没有直接使用过它,但它提供了运行异步任务的框架,提供错误处理,甚至可以将任务分发到远程实例。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接