Akka演员和未来: 通过示例理解

26

我正在尝试学习Akka actors和futures,但在阅读http://akka.io文档和进行http://doc.akka.io/docs/akka/2.0.2/intro/getting-started-first-java.html之后,仍然无法理解。我猜很多人都可以与计算Pi的价值相联系,但我不行=)。我搜索了一下,但没有找到适合我的例子。因此,我想将我的一些实际代码放在这里,并用它交换一下如何使用Akka执行此操作的示例。

好的,我们开始吧:

我有一个Java Play2应用程序,需要从我的数据库中获取一些数据并在我的Elasticsearch实例中索引它们。

  1. 我调用数据库并获取场馆的ID。

  2. 然后我拆分列表并创建几个可调用的索引任务。

  3. 之后,我调用所有任务,每个任务从数据库中收集分配给其ID场馆。

  4. 对于每个场馆,将其索引到Elasticsearch实例并使其可搜索。

  5. 完成。

Application.java:

public class Application extends Controller {

  private static final int VENUE_BATCH = 1000;
  private static int size;

  public static Result index() {

      List<Long> venueIds = DbService.getAllVenueIds();
      size = venueIds.size();
      Logger.info("Will index " + size + " items in total.");
      ExecutorService service = Executors.newFixedThreadPool(getRuntime().availableProcessors());
      int startIx = 0;
      Collection<Callable<Object>> indexTasks = new ArrayList<Callable<Object>>();
      do {
          int endIx = Math.min(startIx + VENUE_BATCH, size);
          List<Long> subList = venueIds.subList(startIx, endIx);
          VenueIndexTask indexTask = new VenueIndexTask(subList);
          indexTasks.add(indexTask);
      } while ((startIx += VENUE_BATCH) < size);

      Logger.info("Invoking all tasks!");
      try {
          service.invokeAll(indexTasks);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }

      return ok(index.render("Done indexing."));
  } 
}

VenueTask:

public class VenueIndexTask implements Callable<Object> {

    private List<Long> idSubList;

    public VenueIndexTask(List<Long> idSubList){
        this.idSubList = idSubList;
        Logger.debug("Creating task which will index " + idSubList.size() + " items. " +
                "Range: " + rangeAsString() + ".");
    }

    @Override
    public Object call() throws Exception {
        List<Venue> venues = DbService.getVenuesForIds(idSubList);
        Logger.debug("Doing some indexing: "+venues.size());

        for(Venue venue : venues) {
            venue.index();
        }
        return null;
    }
    private String rangeAsString() {
        return "[" + idSubList.get(0) + "-" + idSubList.get(idSubList.size() - 1) + "]";
    }
}

地点:

@IndexType(name = "venue")
public class Venue extends Index {

    private String name;

    // Find method static for request
    public static Finder<Venue> find = new Finder<Venue>(Venue.class);

    public Venue() {
    }

    public Venue(String id, String name) {
        super.id = id;
        this.name = name;
    }

    @Override
    public Map toIndex() {
        HashMap map = new HashMap();
        map.put("id", super.id);
        map.put("name", name);
        return map;
    }

    @Override
    public Indexable fromIndex(Map map) {
        if (map == null) {
            return this;
        }
        this.name = (String) map.get("name");
        return this;
    }
}

所以所有的Akka使用者都可以尽情地发挥!请提出可能用于未来的功能、任何其他知识/代码,我可以通过它们来学习这些东西。


1
这是一个很好的问题,但它可能更适合在codereview.stackexchange.com上。 - Ptharien's Flame
2个回答

22
我喜欢将Akka(或任何其他基于消息的系统)比作传送带,就像在工厂中一样。以订单披萨为例,对Actor的简化思考方式可能如下:
  • 您这位饥饿的顾客 (Actor/角色) 发送订单 (一个消息) 给披萨店

  • 客户服务 (Actor/角色) 接受您的订单并给您订单号码 (Future)

  • 如果您很不耐烦,您可能会在电话/互联网/商店等待直到收到披萨 (同步/阻塞交易),否则您可以满意地用订单号稍后查看订单状态 (非阻塞)

  • 客户服务将消息发送给由厨房经理(Actor)监督的主厨 (Actor)。这是一个非常流程密集型的厨房,有层级结构。Akka喜欢这个。请参见监管

  • 主厨创建一个新的披萨,并附上订单的详细信息 (一个新消息),然后通过送货经理 (监管Actor) 将其传递给送货员 (Actor)。

  • 在此过程中,您的订单详细信息没有改变,否则将是一场噩梦。如果您想要纯奶酪而得到了意大利辣香肠披萨,那您就会不高兴!所有消息都应该是不可变的!然而,对于不同的角色而言,消息可能是不同的。送货员希望得到一个披萨和附加的订单细节,厨师希望得到一个订单。当消息需要更改时,将创建一个新消息。

  • 每个Actor都擅长一种角色,如果一个人必须完成所有任务,那么效率会如何?某些角色可能比其他角色数量多(例如:10个线程用于主厨,2个用于送货员,1个客户服务)。

  • 阻塞行为很烦人,如果客户服务在看下一个客户之前等待厨师和送货员呢?

希望我对你有所帮助,这是一个庞大的话题和思维上的重大变化。祝你好运。


1
Coursera目前开设了一门关于响应式编程的课程,其中包括最后三节关于Akka和Actor模型的课程。这包括视频讲座和作业(尽管是Scala而不是Java)。虽然你已经来晚了无法获得完整的证书,但你仍然可以加入课程并查看最后三周的内容。

https://class.coursera.org/reactive-001/class


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接