从Java并发迁移到Scala并发

6

我在Java中有一个解决问题的标准机制:

  • 必须安排工作项在特定时间执行
  • 每个工作项都必须等待条件变为真
  • 工作项应该是可取消的

我的解决方案如下:

  1. 有一个单线程调度程序来安排我的工作项
  2. 有一个ExecutorService(可以是多线程的)
  3. 然后,每个计划的工作项将实际工作提交给ExecutorService。返回的Future被缓存在映射中。完成服务用于在完成工作时从缓存中删除未来
  4. 可以通过缓存的futures取消项目

当然,我的执行程序需要至少与我预期拥有的阻塞工作项一样大,但在实践中这不是问题。

现在我正在使用Scala并使用actor框架进行编码。假设我的工作项可以封装在发送到actor的事件中:

  1. 我将使用什么机制安排特定时间的工作项?
  2. 如果工作项是发送到actor的事件,如何确保后备线程池比同时阻塞的项目数量更大
  3. 如何导致先前安排的工作项被取消?

你能详细解释一下“条件成立”的意思吗?这是全局状态吗?还是I/O? - Apocalisp
这只是意味着作为工作的一部分,进程可能需要在某些事件发生时(比如文件到达)阻塞。 - oxbow_lakes
2个回答

5

如何安排一个具体时间的工作任务?

我会使用java.util.concurrent.ScheduledExecutorService。

如果一个工作项是发送给actor的事件,如何确保后台线程池大于可以同时阻塞的项目数量?

这似乎是一种破坏并行化努力的设计。尝试最小化或消除阻塞和全局状态。这些都是组合性和可扩展性的障碍。例如,考虑有一个单独的线程等待文件到达,然后将事件发送给actors。或者看看java.nio用于异步非阻塞I/O。

我不完全了解你的要求,但似乎你可以有一个单一的线程/actor查找I/O事件。然后将计划的“工作项”作为产生非阻塞actors的效果进行调度。让这些actors向I/O线程/actor注册,以接收他们关心的I/O事件的消息。

如何取消先前安排的工作项?

ScheduledExecutorService返回Futures。在这方面,您拥有的设计并不差。将它们收集到Map中并调用future.cancel()。


你关于“阻塞工作”模型不适合的观点是完全正确的,我自己也基本上得出了同样的结论,但一直在推迟进行完整重写。然而,最终我在上周五终于开始了这个工作,并且现在拥有了一个更好的系统。 - oxbow_lakes

1
你可以创建一个调度Actor,它有一个预定Actor列表,并使用Actor.receiveWithin()每秒钟唤醒一次并向准备执行的Actor发送消息。调度Actor也可以处理取消操作。另一个选择是让每个Actor直接使用receiveWithin()来处理自己的调度,而不是集中调度。
在博客文章Scala中简单的类似cron的调度器中对此问题进行了讨论。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接