使用超时对`Future`进行排序

4

我使用了在Scala Futures - built in timeout?中介绍的TimeoutScheduler

然而,现在我的程序没有TimeoutScheduler之前那样终止。

我有两个Futureres1res2。它们都有15秒的超时时间。最后,我按顺序对这两个Future进行合并,以便在onComplete回调中正确关闭HTTP执行器。如果不使用withTimeout,程序将在http.shutdown之后立即终止。但是,如果使用withTimeout,则不会。为什么?可能还有其他的future...

import java.net.URI
import scala.util.{ Try, Failure, Success }
import dispatch._
import org.json4s._
import org.json4s.native.JsonMethods._
import com.typesafe.scalalogging.slf4j.Logging

object Main extends App with Logging {
  import scala.concurrent.ExecutionContext.Implicits.global

  val http   = Http
  val github = host("api.github.com").secure 

  import timeout._
  import scala.concurrent.duration._
  import scala.language.postfixOps
  import scala.collection.JavaConverters._

  val req: dispatch.Req =  github / "users" / "defunkt"
  val res1 = http(req > dispatch.as.Response(_.getHeaders().get("Server").asScala)) withTimeout (15 seconds) recover { case x => 
    logger.debug("Error1: " + x.toString) 
    Nil
  }
  val res2 = http(req > dispatch.as.Response(_.getHeaders().get("Vary").asScala)) withTimeout (15 seconds) recover { case x => 
    logger.debug("Error2: " + x.toString) 
    Nil
  }

  Future.sequence(res1 :: res2 :: Nil) onComplete { case _ => 
    http.shutdown()               // without using `withTimeout` the program terminated after `http.shutdow`
    TimeoutScheduler.timer.stop() // thanks to @cmbaxter
  }
}

object timeout {
  import java.util.concurrent.TimeUnit
  import scala.concurrent.Promise
  import scala.concurrent.duration.Duration
  import org.jboss.netty.util.Timeout
  import org.jboss.netty.util.TimerTask
  import org.jboss.netty.util.HashedWheelTimer
  import org.jboss.netty.handler.timeout.TimeoutException

   // cf. https://dev59.com/m2Qo5IYBdhLWcg3wI8VV
  object TimeoutScheduler {
    val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
    def scheduleTimeout(promise: Promise[_], after: Duration) = {
      timer.newTimeout(new TimerTask{
        def run(timeout: Timeout){              
          promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
        }
      }, after.toNanos, TimeUnit.NANOSECONDS)
    }
  }
  implicit class FutureWithTimeout[T](f: Future[T]) {
    import scala.concurrent.ExecutionContext

    def withTimeout(after: Duration)(implicit ec: ExecutionContext) = {
      val prom = Promise[T]()
      val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
      val combinedFut = Future.firstCompletedOf(List(f, prom.future))
      f onComplete { case result => timeout.cancel() }
      combinedFut
    }
  } 
}

任何建议都受欢迎,祝好,/nm
1个回答

4

如果你按照我的代码描述完全使用,那么我的猜测是Netty下的HashedWheelTimer(哈希轮定时器)没有被终止。在调用http.shutdown后,你可以尝试显式地调用stop来停止它:

TimeoutScheduler.timer.stop

如果您想让Netty的HashedWheelTimer使用守护线程,那么可以使用其中一个构造函数(我在3.6.6-Final中看到了它们),该构造函数接受一个ThreadFactory,然后使用自定义的ThreadFactory将守护标志设置为true。


我只是稍微修改了你的代码。你认为这是问题吗?整个代码在我的最初帖子中。你的代码在最后。或者也许HashedWheelTimer不是守护进程(由Viktor Klang建议)? - nemron
是的,HashedWheelTimer线程是非守护进程的,因此如果您想将该类用作高效的计时器机制,则必须显式停止它。虽然可能有不同的选项可用于计时器类,但您也可以研究一下。我使用HashedWheelTimer只是为了完整性而已。如果您不喜欢明确关闭它,请不要感到必须使用它。 - cmbaxter
谢谢您的回复。我很愿意明确地关闭它,只是好奇是否真的需要这样做。感谢您的澄清。最后,出于教学目的,您知道我可以尝试的一种守护进程替代方案吗?以便更好地理解。祝好,/nm - nemron
看起来我可能说得太早了。Netty的HashedWheelTimer似乎支持一个构造函数,它接受一个ThreadFactory,用于生成表示计时器的线程。如果您使用该构造函数,应该能够传入自定义的ThreadFactory,将返回的Thread上的守护标志设置为true。 - cmbaxter
感谢您的帮助和调查!您是正确的!我改成了 new HashedWheelTimer(new DefaultDaemonicThreadFactory, 10, TimeUnit.MILLISECONDS),其中 DefaultDaemonicThreadFactoryExecutors#DefaultThreadFactory 的一个原始副本,但具有 t.setDaemon(true)。它有效了!在这种情况下,TimeoutScheduler.timer.stop() 不是必要的。 - nemron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接