使用Java的ExecutorService, Future和ConcurrentLinkedQueue,我已经完成了类似于SQS的任务。
ExecutorService创建一个线程池,可以执行实现Callable接口并返回Future的类。当ExecutorService创建这些futures时,我将它们推入在线程中运行并处理结果的ConcurrentLinkedQueue中。
实现检查SQS并异步启动工作:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class SqsProcessor {
private static final int THREAD_COUNT = 100;
private ExecutorService _executor = null;
private FutureResultProcessor futureResultProcessor = null;
public SqsProcessor() {
_executor = Executors.newFixedThreadPool(THREAD_COUNT);
_futureResultProcessor = new FutureResultProcessor();
}
public void waitReceive() {
Callable<MyWorkerResult> sqsWorker = new MyWorker(sqsMessage);
Future<MyWorkerResult> sqsFuture = _executor.submit(sqsWorker);
_futureResultProcessor.add(sqsFuture);
}
}
执行工作的类:
import java.util.concurrent.Callable;
public class MyWorker implements Callable<MyWorkerResult> {
private String _sqsMessage = null;
public MyWorker(String sqsMessage) {
_sqsMessage = sqsMessage;
}
@Override
public MyWorkerResult call() throws Exception {
}
}
保存工作结果:
public class MyWorkerResult {
}
使用ConcurrentLinkedQueue来接收和处理未来的结果:
import java.util.concurrent.Future;
import java.util.concurrent.ConcurrentLinkedQueue;
public class FutureResultProcessor extends Thread {
private final ConcurrentLinkedQueue<Future<MyWorkerResult>> resultQueue = new ConcurrentLinkedQueue<Future<MyWorkerResult>>();
private final Integer CHECK_SLEEP = 300;
public FutureResultProcessor() {
}
public void run() {
while(true) {
Future<MyWorkerResult> myFuture = resultQueue.poll();
if(myFuture == null) {
try { Thread.sleep(CHECK_SLEEP); } catch (InterruptedException e) {}
continue;
}
if(myFuture != null) {
MyFutureResult myFutureResult = myFuture.get();
}
}
}
public void add(Future<MyWorkerResult> sqsFuture) {
resultQueue.offer(sqsFuture);
}
}
或者你可以收集一组未来并等待它们全部完成后再处理结果。
Akka可能是一个很好的选择。我没有直接使用过它,但它提供了运行异步任务的框架,提供错误处理,甚至可以将任务分发到远程实例。