用一个返回值同步方法包装一系列异步调用

10

我的当前代码使用一系列异步进程来产生结果。我需要将每个进程包装在一个同步方法中,以便每个进程可以通过返回值访问其结果。我想使用执行器服务来实现这一点,以便允许同时发生多个进程。我觉得Future可能与我的实现相关,但我无法找到一个好的方法来实现它。

我现在拥有的:

public class DoAJob {
  ResultObject result;

  public void stepOne() {
    // Passes self in for a callback
    otherComponent.doStepOne(this);
  }

  // Called back by otherComponent once it has completed doStepOne
  public void stepTwo(IntermediateData d) {
    otherComponent.doStepTwo(this, d);
  }

  // Called back by otherComponent once it has completed doStepTwo
  public void stepThree(ResultObject resultFromOtherComponent) {
    result = resultFromOtherComponent;
  //Done with process
  }
}

这在内部运作得相当不错,但现在我需要将我的流程映射到一个带有返回值的同步方法中,例如:

public ResultObject getResult(){
  // ??? What goes here ???
}

有没有关于如何优雅地实现这个的好主意?


1
这个规范中缺少一件事情:你计划如何在最后将不同任务的ResultObject合并成一个? - Eyal Schneider
我不确定我理解这个问题。通常,该过程是由新的DoAJob.stepOne()启动的;stepTwo()方法是由otherComponent的回调启动的。在回调链的末端,ResultObject被正确地填充。任何中间数据位都会集成到最终结果中。 - irondwill
5个回答

9
如果你想把一个异步操作(执行回调函数)变成同步/阻塞的操作,你可以使用一个阻塞队列。如果需要,你可以将其包装在一个Future对象中。
1. 定义一个只能容纳一个元素的阻塞队列: ``` BlockingQueue blockingQueue = new ArrayBlockingQueue(1); ```
2. 启动你的异步进程(会在后台运行),并编写回调函数,使得当它完成时,将其结果添加到阻塞队列中。
3. 在前台/应用程序线程中,使用take()从队列中获取元素,这会阻塞直到有可用的元素为止: ``` Result result = blockingQueue.take(); ```
我以前写过类似的东西(前台线程需要阻塞以等待来自远程机器的异步响应),使用了类似Future的东西,你可以在这里找到示例代码here

SynchronousQueue 可能更适合这个目的。 - ZhongYu
2
我曾经遇到过这种情况(实际上是与我链接的代码)。如果存在竞争条件,使得后台线程在前台线程调用take()之前完成,则SynchronousQueue可能会导致后台线程阻塞或接收异常。如果像OP所提到的那样将其包装在Future中,如果应用程序在调用Future.get()之前执行了其他工作,则这可能是一个常见情况。 - npgall

1

0

我建议使用invokeAll(..)。它将向执行器提交一组任务,并阻塞直到最后一个任务完成(成功/异常)。然后它会返回已完成的Future对象列表,因此您可以在它们上面循环并将结果合并为单个ResultObject。

如果您希望以同步方式仅运行单个任务,则可以使用以下方法:

executor.invokeAll(Collections.singleton(task)); 

--编辑--

现在我认为我更好地理解了您的需求。我假设您需要一种提交独立任务序列的方法。请查看我在此答案中发布的代码。


我在我的示例中没有很好地说明这一点,但是步骤依赖于先前的步骤已经完成。例如,stepTwo() 在 stepOne() 远程完成后执行。 stepTwo() 实际上是在 stepOne() 中调用的组件的完成回调函数。 - irondwill

0
如果你喜欢动手实践,你可以这样做。
ResultObject result;

public void stepOne() 

    otherComponent.doStepOne(this);

    synchronized(this)
        while(result==null) this.wait();
    return result;

public void stepThree(ResultObject resultFromOtherComponent) 

    result = resultFromOtherComponent;

    synchronized(this)
        this.notify();

或者您可以使用更高级的并发工具,例如BlockingQueue、Semaphore、CountdownLatch、Phaser等等。

请注意,DoAJob不是线程安全的——如果两个线程同时调用stepOne,就会出现问题。


是的,我可能最终不得不这样做,因为它似乎是一个相当特定的用例和架构。我希望避免实现wait()循环或在我的代码中使用太多synchronized()调用。 - irondwill

0
Bumerang 是我的仅支持异步的 HTTP 请求库,专为使用 Java 的 Android HTTP 请求构建而成 -> https://github.com/hanilozmen/Bumerang。我需要进行同步调用而不影响我的库。这是我的完整代码。npgall 的答案给了我启示,谢谢!类似的方法也可以应用于各种异步库。
public class TestActivity extends Activity {

MyAPI api = (MyAPI) Bumerang.get().initAPI(MyAPI.class);
BlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<Object>(1);

static int indexForTesting;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_test);
    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            for(int i = 0; i < 10; i++) {
                getItems();
                try {
                    Object response = blockingQueue.take(); // waits for the response
                    Log.i("TAG", "index " + indexForTesting + " finished. Response " + response.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    });
    t.start();
}

void getItems() {
    Log.i("TAG", "index " + ++indexForTesting + " started");
    api.getItems(new ResponseListener<Response<List<ResponseModel>>>() {
        @Override
        public void onSuccess(Response<List<ResponseModel>> response) {
            List<ResponseModel> respModel = response.getResponse();
            try {
                blockingQueue.put(response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void onError(Response<List<ResponseModel>> response) {
            Log.i("onError", response.toString());
            try {
                blockingQueue.put(response);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

}


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接