在Java中并行执行依赖任务

16

我需要找到一种在Java中并行执行任务(独立的和依赖的)的方法。

  1. 任务A和任务C可以独立运行。
  2. 任务B依赖于任务A的输出。

我已经查看了Java的java.util.concurrent Future和Fork/Join,但似乎我们无法为任务添加依赖。

请问有人能指向正确的Java API吗?


你是否考虑过在任务A完成时通知任务B?在开始任务A之前,实例化任务B并将其添加为任务A的观察者(参见观察者模式)。 - David W
Guava的ListenableFuture在这些方面比普通的Futures更加友好。 - Louis Wasserman
8个回答

13

在Scala中,这个任务非常容易完成,我认为你最好使用Scala。以下是一个我从这里http://danielwestheide.com/找到的例子(Scala新手指南第16部分:从这里开始),这位博主写得很棒(我不是那个人)。

我们来看看一个咖啡师制作咖啡的任务:

  1. 磨所需的咖啡豆(没有前置任务)
  2. 加热一些水(没有前置任务)
  3. 使用磨好的咖啡豆和加热的水冲泡浓缩咖啡(依赖于1和2)
  4. 打些奶泡(没有前置任务)
  5. 将奶泡和浓缩咖啡混合(依赖于3和4)

或者用树形图表示:

Grind   _
Coffe    \
          \   
Heat    ___\_Brew____ 
Water                \_____Combine
                     /
Foam    ____________/
Milk

在Java中使用并发API,这将是:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Barrista {

    static class HeatWater implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Heating Water");
            Thread.sleep(1000);
            return "hot water";
        }
    }

    static class GrindBeans implements Callable<String> {
        @Override
        public String call() throws Exception {
            System.out.println("Grinding Beans");
            Thread.sleep(2000);
            return "grinded beans";
        }
    }

    static class Brew implements Callable<String> {

        final Future<String> grindedBeans;
        final Future<String> hotWater;

        public Brew(Future<String> grindedBeans, Future<String> hotWater) {
            this.grindedBeans = grindedBeans;
            this.hotWater = hotWater;
        }

        @Override
        public String call() throws Exception
        {
            System.out.println("brewing coffee with " + grindedBeans.get()
                    + " and " + hotWater.get());
            Thread.sleep(1000);
            return "brewed coffee";
        }
    }

    static class FrothMilk implements Callable<String> {

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            return "some milk";
        }
    }

    static class Combine implements Callable<String> {

        public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) {
            super();
            this.frothedMilk = frothedMilk;
            this.brewedCoffee = brewedCoffee;
        }

        final Future<String> frothedMilk;
        final Future<String> brewedCoffee;

        @Override
        public String call() throws Exception {
            Thread.sleep(1000);
            System.out.println("Combining " + frothedMilk.get() + " "
                    + brewedCoffee.get());
            return "Final Coffee";
        }

    }

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2);

        FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater());
        FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans());
        FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture));
        FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk());
        FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee));

        executor.execute(heatWaterFuture);
        executor.execute(grindBeans);
        executor.execute(brewCoffee);
        executor.execute(frothMilk);
        executor.execute(combineCoffee);


        try {

            /**
             *  Warning this code is blocking !!!!!!!
             */         
            System.out.println(combineCoffee.get(20, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            System.out.println("20 SECONDS FOR A COFFEE !!!! I am !@#! leaving!!");
            e.printStackTrace();
        } finally{
                executor.shutdown();
            }
        }
    }

确保添加超时时间,以确保您的代码不会永远等待某些操作完成。可通过使用Future.get(long, TimeUnit)实现,然后根据情况处理失败。

在Scala中更好,下面是博客中的示例代码:要准备一些咖啡的代码看起来像这样:

def prepareCappuccino(): Try[Cappuccino] = for {
  ground <- Try(grind("arabica beans"))
  water <- Try(heatWater(Water(25)))
  espresso <- Try(brew(ground, water))
  foam <- Try(frothMilk("milk"))
} yield combine(espresso, foam)

所有方法都会返回一个 Future(类型化的 Future),例如 grind 的实现可能如下所示:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future {
   // grinding function contents
}

有关所有实现,请查看博客,但就是这样。您也可以轻松集成Scala和Java。我真的建议在Scala而不是Java中执行此类操作。Scala需要更少的代码,更干净和事件驱动。


4

1

1

0

有一个专门用于此目的的Java库(免责声明:我是该库的所有者)叫做Dexecutor

以下是您可以实现所需结果的方法,您可以在这里了解更多信息

@Test
public void testDependentTaskExecution() {

    DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor();

    executor.addDependency("A", "B");
    executor.addIndependent("C");

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING);

}

private DefaultDependentTasksExecutor<String, String> newTaskExecutor() {
    return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider());
}

private ExecutorService newExecutor() {
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize());
}

private static class SleepyTaskProvider implements TaskProvider<String, String> {

    public Task<String, String> provid(final String id) {

        return new Task<String, String>() {

            @Override
            public String execute() {
                try {
                    //Perform some task
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String result = id + "processed";
                return result;
            }

            @Override
            public boolean shouldExecute(ExecutionResults<String, String> parentResults) {
                ExecutionResult<String, String> firstParentResult = parentResults.getFirst();
                //Do some logic with parent result
                if ("B".equals(id) && firstParentResult.isSkipped()) {
                    return false;
                }
                return true;
            }
        };          
    }

}

0

0
如果任务B依赖于任务A的输出,我首先会质疑任务B是否真的是一个单独的任务。如果存在以下情况,则将任务分开是有意义的:
  • 任务B可以在需要任务A的结果之前完成一些非平凡的工作
  • 任务B是一个长时间运行的过程,处理来自许多不同任务A实例的输出
  • 还有其他任务(比如D)也使用任务A的结果
假设它是一个单独的任务,那么您可以允许任务A和B共享一个BlockingQueue,以便任务A可以传递任务B数据。

0
你需要的是一个 CountDownLatch
final CountDownLatch gate = new CountDownLatch(2);
// thread a
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();

// thread c
new Thread() {
    public void run() {
        // process
        gate.countDown();
    }
}.start();

new Thread() {
    public void run() {
        try {
            gate.await();
            // both thread a and thread c have completed
            // process thread b
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}.start();

作为一种替代方案,根据您的情况,您还可以使用BlockingQueue来实现生产者-消费者模式。请参见文档页面上的示例。

这里使用 CountDownLatch 有些过度,根据 OP 的说法,任务 B 仅依赖于任务 A,而不是任务 A 和 C。话虽如此,在代码片段中 -1 只是因为没有正确处理 InterruptedException - Tim Bender
1
谢谢,代码片段的想法是向他展示CountDownLatch的工作原理,而不是向他展示如何正确处理异常。 - Jeshurun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接