我如何创建块感知执行上下文?

7

由于某些原因,我无法理解如何实现这个功能。我有一个应用程序运行,其中包含调用Elastic SearchPlay。作为我的设计的一部分,我的服务使用Java API,通过scala future进行封装,如博客文章所示。我已经更新了代码,以提示ExecutionContext将执行一些阻塞I/O操作,如下所示:

    import scala.concurent.{blocking, Future, Promise}
    import org.elasticsearch.action.{ActionRequestBuilder, ActionListener, ActionResponse }
    def execute[RB <: ActionRequestBuilder[_, T, _, _]](request: RB): Future[T] = {
        blocking {
            request.execute(this)
            promise.future
        }
    }

我的实际服务会构造查询并将其发送到ES,它会将executionContext作为构造函数参数使用,然后用于对elastic search的调用。我这样做是为了避免play使用的全局执行上下文受到对ES的阻塞调用而被阻塞。这个S.O.评论提到只有全局上下文具有阻塞感知能力,因此我必须自己创建一个。在同一篇帖子/答案中,有很多关于使用ForkJoin池的信息,但我不确定如何将这些文档中的内容与阻塞文档中的提示结合起来,以创建一个响应阻塞提示的执行上下文。

我认为我的一个问题是,我不确定如何在第一时间对阻塞上下文做出响应?我正在阅读最佳实践,它使用的示例是一个无界线程缓存:

请注意,我更喜欢使用无界“缓存线程池”,因此它没有限制。当进行阻塞I/O时,想法是您必须拥有足够的线程来阻塞。但是,如果无限制太多,根据用例,您可以稍后进行微调,这个示例的想法是让您开始工作。

那么,这是否意味着在使用我的ForkJoin支持的线程池时,当处理非阻塞I/O时应该尝试使用缓存线程,并为阻塞I/O创建新线程?还是其他什么?我在网上找到的几乎所有关于使用单独线程池的资源都倾向于像初学者指南所做的那样:如何调整各种线程池高度依赖于您的个人应用程序,超出了本文的范围。
我知道它取决于您的应用程序,但在这种情况下,如果我只想创建某种阻塞感知ExecutionContext并了解管理线程的良好策略。如果Context专门用于应用程序的某个部分,那么我应该只制定一个固定的线程池大小,而不使用/忽略首先出现的blocking关键字吗?
我倾向于冗长,因此我将尝试分解我在答案中寻找的内容:
  1. 编程!阅读所有这些文档仍然让我感觉像是离能够编写一个阻塞感知上下文还有一步之遥,我真的很需要一个例子。
  2. 有关如何处理阻塞线程的任何链接或提示,例如为它们创建一个无限的新线程、检查可用线程数量并在太多时拒绝,或者其他策略。
  3. 我不是在寻找性能提示,我知道只有通过测试才能获得那个,但如果我无法弄清楚如何首先编写上下文,我就无法进行测试!我找到了 ForkJoins vs threadpools 的示例,但我缺少关于blocking的重要部分。

对于这个冗长的问题,我很抱歉,我只是想让您了解我正在看什么,并且我已经试图理解这个问题超过一天,需要一些外部帮助。


编辑:仅为明确起见,ElasticSearch服务的构造函数签名是:

//Note that these are not implicit parameters!
class ElasticSearchService(otherParams ..., val executionContext: ExecutionContext)

在我的应用程序启动代码中,我有类似这样的东西:
object Global extends GlobalSettings {
    val elasticSearchContext = //Custom Context goes here
    ...
    val elasticSearchService = new ElasticSearchService(params, elasticSearchContext);
    ...
}

我也在阅读Play的上下文建议,但尚未看到有关阻止提示的任何内容,我怀疑我可能需要查看源代码以查看它们是否扩展了BlockContext特质。


我建议避免尝试编写自己的执行上下文。只需设置一个基于线程池的执行器,具有适当的最小和最大线程计数,并将其用于对ES的调用即可。这将从主执行上下文中实质上隔离它,以便其阻塞行为不会影响该主上下文。 - cmbaxter
@cmbaxter 所以就像这样 val executorService = Executors.newFixedThreadPool(someNumberFromConf); val executionContext = ExecutionContext.fromExecutorService(executorService) ? 我有点困惑当一个执行上下文改变时会发生什么,我的控制器代码使用全局隐式来响应 Action.async,但控制器本身调用服务,该服务将使用另一个上下文。你有任何关于Scala如何处理这个问题的想法吗?我不禁要想系统是如何在上下文之间切换的 - EdgeCaseBerg
你说你的ES调用服务以ExecutionContext作为构造函数参数。在那段代码中,当调用ES时,只需要确保使用了一个即可。在那个特定的Scala文件中,不要导入全局隐式的ExecutionContext。我想你使用调用ES的库需要一个ExecutionContext,只需确保使用这个即可。 - cmbaxter
没错,我没有导入全局隐式参数到我的ES服务中。我使用的是Java ES API代码,它是阻塞的,所以我像博客文章中描述的那样将其包装在Scala Future promises中。在应用程序启动时,服务已经明确创建了上下文,我会更新我的问题并附上一些代码来说明这一点。 - EdgeCaseBerg
1个回答

3
所以我查阅了文档和Play最佳实践,针对我正在处理的情况,最好的做法是:

在某些情况下,您可能希望将工作分派给其他线程池。这可能包括CPU密集型工作或IO工作,例如数据库访问。为此,您应该首先创建一个线程池,这可以很容易地在Scala中完成:

并提供一些代码:

object Contexts {
    implicit val myExecutionContext: ExecutionContext = Akka.system.dispatchers.lookup("my-context")
}

上下文是来自Akka的,所以我去那里搜索默认值和上下文类型,最终导致我找到了调度程序文档。默认情况下是一个ForkJoinPool,它管理块的默认方法是调用managedBlock(blocker)。这使我阅读了说明文件,其中指出:

根据给定的阻塞器进行阻止。如果当前线程是ForkJoinWorkerThread,则此方法可能会安排激活备用线程(如果需要)以确保当前线程被阻止时有足够的并行性。

因此,看起来如果我有一个ForkJoinWorkerThread,那么我认为想要的行为将发生。再次查看ForkJoinPool的源代码,我注意到默认的线程工厂是:

val defaultForkJoinWorkerThreadFactory: ForkJoinWorkerThreadFactory = juc.ForkJoinPool.defaultForkJoinWorkerThreadFactory

这意味着如果我使用Akka中的默认设置,我将得到一个处理阻塞的上下文,其方式符合我的预期。
因此,再次阅读Akka文档,似乎应该像这样指定我的上下文:
my-context {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 8
    parallelism-factor = 3.0
    parallelism-max = 64
    task-peeking-mode = "FIFO"
  }
  throughput = 100
}

我希望你能翻译中文,这段内容与编程有关。您需要修改内容以使其更易于理解,但请保留HTML标记。以下是需要翻译的内容:

这就是我想要的。

当我在查找源代码时,我查找了blocking或调用managedBlock的用法,并在ThreadPoolBuilder中找到了覆盖ForkJoin行为的示例。

private[akka] class AkkaForkJoinWorkerThread(_pool: ForkJoinPool) extends ForkJoinWorkerThread(_pool) with BlockContext {
    override def blockOn[T](thunk: ⇒ T)(implicit permission: CanAwait): T = {
      val result = new AtomicReference[Option[T]](None)
      ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
        def block(): Boolean = {
          result.set(Some(thunk))
          true
        }
        def isReleasable = result.get.isDefined
      })
      result.get.get // Exception intended if None
    }
  }

这似乎就是我最初要求的实现BlockContext示例。该文件还展示了如何创建ExecutorServiceFactory,我相信配置文件中executor部分引用的就是它。因此,如果我想要完全自定义上下文,我会继承某种WorkerThread类型并编写自己的ExecutorServiceFactory,使用自定义workerthread,然后在属性中指定完全限定的类名,就像this post advises中所建议的那样。

我可能会选择使用Akka的forkjoin :)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接