我正在尝试学习Akka actors和futures,但在阅读http://akka.io文档和进行http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html之后,仍然无法理解。我猜很多人都可以与计算Pi的价值相联系,但我不行=)。我搜索了一下,但没有找到适合我的例子。因此,我想将我的一些实际代码放在这里,并用它交换一下如何使用Akka执行此操作的示例。
好的,我们开始吧:
我有一个Java Play2应用程序,需要从我的数据库中获取一些数据并在我的Elasticsearch实例中索引它们。
我调用数据库并获取场馆的ID。
然后我拆分列表并创建几个可调用的索引任务。
之后,我调用所有任务,每个任务从数据库中收集分配给其ID场馆。
对于每个场馆,将其索引到Elasticsearch实例并使其可搜索。
完成。
Application.java:
public class Application extends Controller {
private static final int VENUE_BATCH = 1000;
private static int size;
public static Result index() {
List<Long> venueIds = DbService.getAllVenueIds();
size = venueIds.size();
Logger.info("Will index " + size + " items in total.");
ExecutorService service = Executors.newFixedThreadPool(getRuntime().availableProcessors());
int startIx = 0;
Collection<Callable<Object>> indexTasks = new ArrayList<Callable<Object>>();
do {
int endIx = Math.min(startIx + VENUE_BATCH, size);
List<Long> subList = venueIds.subList(startIx, endIx);
VenueIndexTask indexTask = new VenueIndexTask(subList);
indexTasks.add(indexTask);
} while ((startIx += VENUE_BATCH) < size);
Logger.info("Invoking all tasks!");
try {
service.invokeAll(indexTasks);
} catch (InterruptedException e) {
e.printStackTrace();
}
return ok(index.render("Done indexing."));
}
}
VenueTask:
public class VenueIndexTask implements Callable<Object> {
private List<Long> idSubList;
public VenueIndexTask(List<Long> idSubList){
this.idSubList = idSubList;
Logger.debug("Creating task which will index " + idSubList.size() + " items. " +
"Range: " + rangeAsString() + ".");
}
@Override
public Object call() throws Exception {
List<Venue> venues = DbService.getVenuesForIds(idSubList);
Logger.debug("Doing some indexing: "+venues.size());
for(Venue venue : venues) {
venue.index();
}
return null;
}
private String rangeAsString() {
return "[" + idSubList.get(0) + "-" + idSubList.get(idSubList.size() - 1) + "]";
}
}
地点:
@IndexType(name = "venue")
public class Venue extends Index {
private String name;
// Find method static for request
public static Finder<Venue> find = new Finder<Venue>(Venue.class);
public Venue() {
}
public Venue(String id, String name) {
super.id = id;
this.name = name;
}
@Override
public Map toIndex() {
HashMap map = new HashMap();
map.put("id", super.id);
map.put("name", name);
return map;
}
@Override
public Indexable fromIndex(Map map) {
if (map == null) {
return this;
}
this.name = (String) map.get("name");
return this;
}
}
所以所有的Akka使用者都可以尽情地发挥!请提出可能用于未来的功能、任何其他知识/代码,我可以通过它们来学习这些东西。