循环信息/任务队列现有解决方案

6
考虑有一组有限的任务需要在特定时间内完成(这些任务在该时间段内均匀分布),并且需要重复执行。
对于一个本地工作者/线程,我们只需像这样做(对于伪代码表示抱歉):
long interval = period / tasks.size

while (true) {
  for (task in tasks) { 
    task.do()
    sleep(interval)
  }
}

现在我想以分布式方式完成这个任务,使用多个独立的工作者。
是否有已知的最佳实践解决方案(最好来自Java世界)?循环消息队列?任务的分布式锁?我已经搜索了很多,但是似乎没有优雅的开箱即用的解决方案。

为什么不使用定时器任务调度来重复执行间隔? - JineshEP
@user1653941 要做什么具体的事情? - Alexander Eliseyev
为了重复安排指定的任务,即固定延迟执行 - 可以使用TimerTask和schedule方法,如https://docs.oracle.com/javase/7/docs/api/java/util/Timer.html中所述。 - JineshEP
1
你的代码需要什么保证?两个任务可以(部分)并行运行吗?任务之间是否存在依赖关系?当任务执行时间过长时会发生什么? - Jens
注意:您的时间间隔假定任务根本不需要时间。如果是这种情况,您只需要一个线程而不需要多个工作线程。只有当一个任务可以在上一个任务完成之前开始时,才需要多个工作线程。 - Peter Lawrey
显示剩余2条评论
3个回答

4
我认为没有一种单一的“最佳”方法,因为这将是在灵活性和均匀性保证的强度之间进行权衡。
在灵活性方面,只需在某个时间随机分配每个任务给一个工人。这不需要工人之间通信,因此可扩展和并行化。如果你有很多任务,你会有一些期望事情应该相当均匀。
在强制均匀性端,你应该按@Lino所示,将任务按工人和时间段划分。这将要求你预先知道你有多少工人等等,并且不能并行化。
还有许多介于这两个极端之间的替代方法,以及混合方法。
为了缩小答案范围,您需要提供有关限制条件和成功标准的更多细节。

2

下面是我想出来的代码。我尽量在每个步骤中添加了注释,但简而言之,它只是将所有任务的工作负载均匀分配给所有可用的工作者,并计算在给定的毫秒数内完成所有任务所需的等待时间。

// the tasks we want to execute
final List<Runnable> tasks = Arrays.asList(
    () -> System.out.println("First"),
    () -> System.out.println("Second"),
    () -> System.out.println("Third"),
    () -> System.out.println("Fourth")
);

// amount of threads
final int amountOfWorkers = 2;

// period in milliseconds
final int period = 1000;

// caching the size for multiple use
final int tasksSize = tasks.size();

// calculating the workload of each worker
final int workLoad = (int) Math.ceil((double) tasksSize / amountOfWorkers);

// interval of sleep for each worker
final int workerPeriod = period / workLoad;

// a list of all workers
final List<Thread> workers = new ArrayList<>();

// in this for loop we create each worker and add it to above list
for(int i = 0; i < amountOfWorkers; i++){
    // calculating the start of the sublist
    final int startIndex = i * workLoad;
    // calculating the end of the sublist
    final int endIndex = (i + 1) * workLoad;
    // here we create the subTasks for each worker, we need to take into account that the tasksList may not 
    // divide fully. e.g. 7 for 4 workers leaves the last worker with only one task
    final List<Runnable> subTasks = tasks.subList(startIndex, endIndex < tasksSize ? endIndex : tasksSize);
    // creating the worker itself
    final Thread worker = new Thread(() -> {
        for(final Runnable subTask : subTasks){
            try{
                subTask.run();
                Thread.sleep(workerPeriod);
            } catch(InterruptedException e){
                throw new IllegalStateException(e);
            }
        }
    });

    // add it to our worker list
    workers.add(worker);
    // and start it
    worker.start();
}

// at last we wait in the main thread for all workers to finish
for(final Thread worker : workers){
    worker.join();
}

当然,这可以放到一个类中,该类接受输入参数,如:

  • 工人数量
  • 执行时间
  • 任务

这将更符合面向对象编程(OOP)的要求。如果需要,我可以提供该代码,但上述内容应该能给你一个大致的想法。


-1

您可以使用Java-8流来运行stream.paralle(),如下所示:

    List<Task> tasks = new ArrayList<>();
    Stream.of(tasks).parallel().forEach(t -> t.doSomething());

这将为您使用所有的CPU资源。

如果您需要更像集群的解决方案,即在网络上运行,您有不同的选择,但它们都涉及一些框架。

我个人更喜欢Vert.x,它很容易设置一个集群,并使用事件总线分配您的工作负载。其他简单的选项是Spring。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接