使用Netty和Scala actors进行异步http请求

6

使用Netty和Scala Actors进行异步http请求

希望有人能帮我一下。

我正在尝试使用Scala Actors和Netty.io库来进行异步http请求。(是的,我知道Scala Actors即将被弃用,但这对我来说是一个学习练习)

我编写了一个actor HttpRequestActor,它接受一个形式为case class RequestPage(uri:URI)的消息。

当它收到消息时,它会创建必要的Netty对象以进行http请求,我大部分代码都是基于[HttpSnoopClient] (http://static.netty.io/3.5/xref/org/jboss/netty/example/http/snoop/HttpSnoopClient.html) 示例编写的。

我创建了一个客户端并将当前actor实例传递给我的ChannelPipelineFactory实现,它还将actor传递给我的SimpleChannelUpstreamHandler实现,在那里我重写了messageReceived函数。

actor实例作为监听器传递,我使用DefaultHttpRequest类创建一个请求并写入通道以进行请求。

使用从写入通道返回的ChannelFuture对象调用actor对象的阻塞调用。当我的处理程序类的messageRecieved函数被调用时,我将netty http请求的响应解析为字符串,将响应内容发送回actor,并关闭通道。

完成future后,我的代码尝试向调用的actor发送一个带有http内容响应的回复。

代码可以运行,我能够获得回复,将其发送给我的actor实例,打印出内容并向actor实例发送使用的资源释放消息。

问题在于当我测试它时,原始对actor的调用不会收到回复,线程会一直保持开启状态。

代码示例 - HttpRequestActor

我的HttpRequestActor类的代码:

    import scala.actors.Actor
import java.net.{InetSocketAddress,URI}
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.Channel
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.group.DefaultChannelGroup
import java.util.concurrent.{Executors,CancellationException}
import org.jboss.netty.util.CharsetUtil
import scala.concurrent.{ Promise, Future }
import scala.concurrent.ExecutionContext.Implicits.global

/**
 * @author mebinum
 *
 */
class HttpRequestActor extends Actor {
    //initialize response with default uninitialized value
    private var resp:Response = _
    private val executor = Executors.newCachedThreadPool
    private val executor2 = Executors.newCachedThreadPool
    private val factory = new NioClientSocketChannelFactory(
                          executor,
                          executor2);

    private val allChannels = new DefaultChannelGroup("httpRequester")

    def act = loop {
        react {
            case RequestPage(uri) => requestUri(uri)
            case Reply(msg) => setResponse(Reply(msg))
            case NoReply => println("didnt get a reply");setResponse(NoReply)
            case NotReadable => println("got a reply but its not readable");setResponse(NotReadable)
            case ShutDown => shutDown()
        }
    }

    private def requestUri(uri:URI) = {

      makeChannel(uri) map {
          channel => {
              allChannels.add(channel)
              val request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toString)
              request.setHeader(HttpHeaders.Names.HOST, uri.getHost())
              request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE)
              request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)

              val writeFuture = channel.write(request).awaitUninterruptibly()

              FutureReactor !? writeFuture match {
                  case future : ChannelFuture => {
                      future.addListener(new ChannelFutureListener() {
                          def operationComplete(future:ChannelFuture) {
                              // Perform post-closure operation
                              println("current response is " + resp)
                              sendResponse("look ma I finished")
                          }
                      })
                      future.getChannel().close()
                  }
              }

              this ! ShutDown
          }
      }
      //thread ends only if you send a reply from here
      //println("this is final sender " + sender)
      //reply("I am the true end")
    }

    private def makeChannel(uri:URI) = {
      val scheme = Some(uri.getScheme()).getOrElse("http")
      val host = Some(uri.getHost()).getOrElse("localhost")

      val port = Utils.getPort(uri.getPort, uri.getScheme)

      // Set up the event pipeline factory.
      val client = new ClientBootstrap(factory)
      client.setPipelineFactory(new PipelineFactory(this))

      //get the promised channel
      val channel = NettyFutureBridge(client.connect(new InetSocketAddress(host, port)))
      channel  
    }

    private def setResponse(aResponse:Response) = resp = aResponse

    private def sendResponse(msg:String) = {
      println("Sending the response " + msg)
      reply(resp)
    }

    private def shutDown() = {
        println("got a shutdown message")
        val groupFuture = allChannels.close().awaitUninterruptibly()
        factory.releaseExternalResources()
    }

    override def exceptionHandler = {
      case e : CancellationException => println("The request was cancelled"); throw e
      case tr: Throwable => println("An unknown exception happened " + tr.getCause()); throw tr
    }
}



trait Response
case class RequestPage(url:URI)

case class Reply(content:String) extends Response
case object NoReply extends Response
case object NotReadable extends Response
case object ShutDown

object FutureReactor extends Actor{
  def act = //loop {
      react {
        case future: ChannelFuture => {
            if (future.isCancelled) {
                throw new CancellationException()
            }
            if (!future.isSuccess()) {
                future.getCause().printStackTrace()
                throw future.getCause()
            }
            if(future.isSuccess() && future.isDone()){
                future.getChannel().getCloseFuture().awaitUninterruptibly()
                reply(future)
            }
        }
      }
    //}
  this.start
}


class ClientHandler(listener:Actor) extends SimpleChannelUpstreamHandler {

  override def exceptionCaught( ctx:ChannelHandlerContext, e:ExceptionEvent){
    e.getCause().printStackTrace()
    e.getChannel().close();
    throw e.getCause()
  }

  override def messageReceived(ctx:ChannelHandlerContext,  e:MessageEvent) = {
        var contentString = ""
        var httpResponse:Response =  null.asInstanceOf[Response]

        e.getMessage match {
          case (response: HttpResponse) if !response.isChunked => {
              println("STATUS: " + response.getStatus);
              println("VERSION: " + response.getProtocolVersion);
              println

              val content = response.getContent();
              if (content.readable()) {
                  contentString = content.toString(CharsetUtil.UTF_8)
                  httpResponse = Reply(contentString)
                  //notify actor

              }else{
                 httpResponse = NotReadable
              }
          }
          case chunk: HttpChunk if !chunk.isLast => {
            //get chunked content
            contentString = chunk.getContent().toString(CharsetUtil.UTF_8)
            httpResponse = Reply(contentString)
          }
          case _ => httpResponse = NoReply
        }
         println("sending actor my response")
         listener ! httpResponse
         println("closing the channel")
         e.getChannel().close()
         //send the close event

    }


}


class PipelineFactory(listener:Actor) extends ChannelPipelineFactory {

    def  getPipeline(): ChannelPipeline = {
            // Create a default pipeline implementation.
            val pipeline = org.jboss.netty.channel.Channels.pipeline()

            pipeline.addLast("codec", new HttpClientCodec())

            // Remove the following line if you don't want automatic content decompression.
            pipeline.addLast("inflater", new HttpContentDecompressor())

            // Uncomment the following line if you don't want to handle HttpChunks.
            //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576))

            pipeline.addLast("decoder", new HttpRequestDecoder())
            //assign the handler
            pipeline.addLast("handler", new ClientHandler(listener))

            pipeline;
    }
}


object NettyFutureBridge { 
  import scala.concurrent.{ Promise, Future }
  import scala.util.Try
  import java.util.concurrent.CancellationException 
  import org.jboss.netty.channel.{ Channel, ChannelFuture, ChannelFutureListener }

  def apply(nettyFuture: ChannelFuture): Future[Channel] = { 
    val p = Promise[Channel]() 
    nettyFuture.addListener(new ChannelFutureListener { 
      def operationComplete(future: ChannelFuture): Unit = p complete Try( 
        if (future.isSuccess) {
          println("Success")
          future.getChannel
        }
        else if (future.isCancelled) {
          println("Was cancelled")
          throw new CancellationException 
        }

        else {
          future.getCause.printStackTrace()
          throw future.getCause
        })
    }) 
    p.future 
  }
} 

测试代码

val url = "http://hiverides.com"

test("Http Request Actor can recieve and react to message"){
    val actor = new HttpRequestActor()
    actor.start

    val response = actor !? new RequestPage(new URI(url)) 
    match {
      case Reply(msg) => {
          println("this is the reply response in test")
          assert(msg != "")
          println(msg)
        }
      case NoReply => println("Got No Reply")
      case NotReadable => println("Got a not Reachable")
      case None => println("Got a timeout")
      case s:Response => println("response string \n" + s)
      case x => {println("Got a value not sure what it is"); println(x);}

    }
  }

所使用的库: - Scala 2.9.2 - Netty.io 3.6.1.Final - Junit 4.7 - scalatest 1.8 - 我还使用了@viktorklang的NettyFutureBridge对象gist来创建返回Channel对象的Scala future

如何将Netty的响应内容发送回Actor对象并结束线程?

非常感谢您的帮助


谢谢你的链接 Dylan,这个库看起来很全面,但我仍然想要一个简单的解决方案,并真正理解我做错了什么。 - Mike E.
@MikeE。你是在使用Scala 2.10.RC还是Scala 2.9.2?我在Scala 2.9.2中找不到scala.concurrent.*。从你的代码示例中,我能够理解的是,你正在将http响应(来自netty messageReceived()回调)设置为var resp: Response,并尝试在channel.write() future上将其回复给原始发送者。问题是channel.write() future在messageReceived()回调之前完成了。我可以看到一些不必要的channel.close(),由于你有Shutdown()消息,为什么需要在多个地方调用它。 - Jestan Nirojan
嘿@JestanNirojan,我正在使用2.9.2,但我可能要添加一些对Scalatra的依赖,所以如果它不在标准库中,它可能来自那里。var resp:Response实际上不是HttpResponse,而是我创建的一个case类。当调用的actor(我传递给ClientHandler)接收到Reply(msg)时,我设置了resp的值。这很有效,因为我能够得到响应并显示Reply(msg)的值。是的,你可能对过多的关闭调用是正确的,这只是我试图让它工作。 - Mike E.
@MikeE。这是我使用Akka创建Netty角色的尝试 :),http://git.io/KSXjjA - Jestan Nirojan
代码可以运行,我能够得到回复,将其发送到我的actor实例,打印出内容并向actor实例发送释放资源的消息。问题在于当我测试它时,原始对actor的调用没有得到回复,线程仍然保持开启状态。为什么说代码可以运行,如果测试失败,你是如何尝试使用它来说明它可以运行的? - Edmondo
显示剩余10条评论
1个回答

0
我不懂Scala,但我有过类似的问题。尝试指定响应的content-length头部。
用普通Java表达:
HttpRequest r = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri);
            ChannelBuffer buffer = ChannelBuffers.copiedBuffer(input);
            r.setHeader(HttpHeaders.Names.HOST, "host");
            r.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/octet-stream");
            r.setHeader(HttpHeaders.Names.CONTENT_LENGTH, buffer.readableBytes());
            r.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
            r.setContent(buffer);

否则,服务器无法知道客户端何时完成内容,除非客户端关闭连接。
您还可以使用分块编码,但是您必须自己实现分块编码(至少我不知道Netty中是否有库可以实现它)。

感谢@aaron的回复,我尝试了一下但没有什么改变。我认为问题可能实际上与我的Scala actor代码有关,而不是与我的netty实现有关。 - Mike E.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接