Java示例:如何在AKKA中发送非阻塞的HTTP请求

4

AKKA文档中写道:

... Actor不应该在某些外部实体上阻塞(即被动地等待,同时占用线程),这可能是锁定、网络套接字等。 阻塞操作应在一些特殊情况下的线程中完成,该线程向将处理它们的Actor发送消息。 来源 http://doc.akka.io/docs/akka/2.0/general/actor-systems.html#Actor_Best_Practices

目前我找到了以下信息:

  1. 我阅读了从Akka/Scala发送出站HTTP请求,并查看了https://github.com/dsciamma/fbgl1上的示例。

  2. 我发现以下文章http://nurkiewicz.blogspot.de/2012/11/non-blocking-io-discovering-akka.html解释了如何使用https://github.com/AsyncHttpClient/async-http-client非阻塞http客户端与akka一起使用。但是它是用Scala编写的。

我该如何编写一个能够进行非阻塞http请求的Actor?

它必须将远程URL页面下载为文件,然后将生成的文件对象发送给Master Actor。 Master Actor然后将此请求发送到解析器Actor以解析该文件...


1
异步 I/O 通常在服务器端使用,以支持最小资源下的数千个连接。你真的想在客户端程序中拥有如此多的连接吗?如果不是,只需在单独的线程上运行普通的同步客户端(示例在其他地方),等待文件下载完成,然后将其发送给主 actor。这是最简单的方法,并且易于调试。 - Alexei Kaigorodov
1
我想编写一个爬虫并在多台服务器上运行。因此,我希望将其设置为非阻塞,以便符合Akka标准。是否可以在Actor中创建一个单独的线程,并在线程完成时向主节点发送消息? - Koray Güclü
2个回答

4
在上一条消息中,Koray错误地引用了发件人的参考信息,正确的方法是:
public class ReduceActor extends UntypedActor {

@Override
public void onReceive(Object message) throws Exception {
    if (message instanceof URI) {
        URI url = (URI) message;


        AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
        final ActorRef sender = getSender();
        asyncHttpClient.prepareGet(url.toURL().toString()).execute(new AsyncCompletionHandler<Response>() {

            @Override
            public Response onCompleted(Response response) throws Exception {
                File f = new File("e:/tmp/crawler/" + UUID.randomUUID().toString() + ".html");
                // Do something with the Response
                // ...
                // System.out.println(response1.getStatusLine());
                FileOutputStream fao = new FileOutputStream(f);
                IOUtils.copy(response.getResponseBodyAsStream(), fao);
                System.out.println("File downloaded " + f);
                sender.tell(new WordCount(f));
                return response;
            }

            @Override
            public void onThrowable(Throwable t) {
                // Something wrong happened.
            }
        });
    } else
        unhandled(message);
  }

请查看这个关于akka的其他讨论:https://stackoverflow.com/a/11899690/575746


0

我已经以这种方式实现了它。

public class ReduceActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
    if (message instanceof URI) {
        URI url = (URI) message;

        AsyncHttpClient asyncHttpClient = new AsyncHttpClient();

        asyncHttpClient.prepareGet(url.toURL().toString()).execute(new AsyncCompletionHandler<Response>() {

            @Override
            public Response onCompleted(Response response) throws Exception {
                File f = new File("e:/tmp/crawler/" + UUID.randomUUID().toString() + ".html");
                // Do something with the Response
                // ...
                // System.out.println(response1.getStatusLine());
                FileOutputStream fao = new FileOutputStream(f);
                IOUtils.copy(response.getResponseBodyAsStream(), fao);
                System.out.println("File downloaded " + f);
                getSender().tell(new WordCount(f));
                return response;
            }

            @Override
            public void onThrowable(Throwable t) {
                // Something wrong happened.
            }
        });
    } else
        unhandled(message);
}

1
这不是完全异步的解决方案:asyncHttpClient在内存中收集所有块,并在最后调用onCompleted()。更合理的做法是在接收到每个块时直接将其写入磁盘。此外,可以以异步非阻塞方式进行磁盘写入。 - Alexei Kaigorodov
AsyncHttpClient库支持零字节复制响应,例如复制到文件,详见:http://sonatype.github.io/async-http-client/zero-bytes-copy.html 文件关闭将在AsyncHandler接口的onCompleted()方法中完成。 - Marek Gregor
2
我认为你不应该在onComplete中调用"ReduceActor#getSender()"。请记住,当onCompleted(...)执行时,ReduceActor可能已经从不同的发送者接收到了不同的消息。你想要做的是在onReceive()中将getSender()分配给一个final变量,并在onCompleted(...)中引用它。(希望这样说得清楚)。 - Jack Leow

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接