有没有一种简单的方法将Future<Future<T>>转换为Future<T>?

20

我有一些代码,它向另一个线程提交请求,该线程可能会将该请求提交给另一个线程,也可能不会。 这会产生返回类型为 Future<Future<T>>。 是否有某种非常麻烦的方法可以立即将其转换为等待整个未来链完成的 Future<T>

我已经在使用Guava库处理其他有趣的并发事务,并且作为Google Collections的替代品已经运行良好,但是我似乎找不到适用于此情况的解决方案。


4
如果您能添加一些更多的上下文信息会很有帮助。显而易见的答案是调用get()函数,但这可能不是您想要的。 - Steve g
已完成。抱歉之前表达不太清晰。 - Nik
听起来你需要Monad。 - user
5个回答

7

另一个可能的实现方法是使用guava库,这种方法更加简单。

import java.util.concurrent.*;
import com.google.common.util.concurrent.*;
import com.google.common.base.*;

public class FFutures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return Futures.chain(Futures.makeListenable(future), new Function<Future<T>, ListenableFuture<T>>() {
      public ListenableFuture<T> apply(Future<T> f) {
        return Futures.makeListenable(f);
      }
    });
  }
}

看起来这样做可以让我把所有的Future处理委托给guava。 - Nik

5
Guava 13.0增加了Futures.dereference,可以实现这一点。它需要一个ListenableFuture<ListenableFuture>,而不是一个普通的Future<Future>。(对于普通的Future操作需要进行makeListenable调用,每个调用都需要为任务的生命周期提供一个专用线程(正如该方法的新名称JdkFutureAdapters.listenInPoolThread所示)。)

1

我认为这是实现Future合同的最佳方法。我采取了尽可能不聪明的方法,以确保它符合合同要求。特别是get与超时的实现。

import java.util.concurrent.*;

public class Futures {
  public <T> Future<T> flatten(Future<Future<T>> future) {
    return new FlattenedFuture<T>(future);
  }

  private static class FlattenedFuture<T> implements Future<T> {
    private final Future<Future<T>> future;

    public FlattenedFuture(Future<Future<T>> future) {
      this.future = future;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
      if (!future.isDone()) {
        return future.cancel(mayInterruptIfRunning);
      } else {
        while (true) {
          try {
            return future.get().cancel(mayInterruptIfRunning);
          } catch (CancellationException ce) {
            return true;
          } catch (ExecutionException ee) {
            return false;
          } catch (InterruptedException ie) {
            // pass
          }
        }
      }
    }

    public T get() throws InterruptedException, 
                          CancellationException, 
                          ExecutionException 
    {
      return future.get().get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
                                                     CancellationException, 
                                                     ExecutionException, 
                                                     TimeoutException 
    {
      if (future.isDone()) {
        return future.get().get(timeout, unit);
      } else {
        return future.get(timeout, unit).get(0, TimeUnit.SECONDS);
      }
    }

    public boolean isCancelled() {
      while (true) {
        try {
          return future.isCancelled() || future.get().isCancelled();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return false;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }

    public boolean isDone() {
      return future.isDone() && innerIsDone();
    }

    private boolean innerIsDone() {
      while (true) {
        try {
          return future.get().isDone();
        } catch (CancellationException ce) {
          return true;
        } catch (ExecutionException ee) {
          return true;
        } catch (InterruptedException ie) {
          // pass
        }
      }
    }
  }
}

0
你可以创建一个类,例如:
public class UnwrapFuture<T> implements Future<T> {
    Future<Future<T>> wrappedFuture;

    public UnwrapFuture(Future<Future<T>> wrappedFuture) {
        this.wrappedFuture = wrappedFuture;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        try {
            return wrappedFuture.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            //todo: do something
        } catch (ExecutionException e) {
            //todo: do something
        }
    }
    ...
}

你将不得不处理get()方法可能引发的异常,而其他方法则不会。


1
那基本上就是我想要避免的。而且你那里的取消方法会让取消等待链中第一个未来完成。这绝对不是我要找的。 - Nik
2
将此转换为等待整个future链完成的Future<T>?我认为在获取第二个future之前无法取消它。但是在第一个future返回它之前,您无法获得它。 - Dave
干得好。虽然第二个Future是由第一个Future创建的,但我敢肯定你可能会遇到这样一种状态:你已经取消了第一个Future,但它仍然生成了第二个Future,而你无法取消它。我猜你可以通过使第一个Future可监听(Futures.makeListenable)并添加监听器,在返回时立即取消链式Future来修复这个问题。然后问题就变成如何测试这种情况。 - Nik

0

这是我第一次尝试,但我相信其中有很多错误。我很乐意用类似 Futures.compress(f) 的东西来替换它。

public class CompressedFuture<T> implements Future<T> {
    private final Future<Future<T>> delegate;

    public CompressedFuture(Future<Future<T>> delegate) {
        this.delegate = delegate;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (delegate.isDone()) {
            return delegate.cancel(mayInterruptIfRunning);
        }
        try {
            return delegate.get().cancel(mayInterruptIfRunning);
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public T get() throws InterruptedException, ExecutionException {
        return delegate.get().get();
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        long endTime = System.currentTimeMillis() + unit.toMillis(timeout);
        Future<T> next = delegate.get(timeout, unit);
        return next.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public boolean isCancelled() {
        if (!delegate.isDone()) {
            return delegate.isCancelled();
        }
        try {
            return delegate.get().isCancelled();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }

    @Override
    public boolean isDone() {
        if (!delegate.isDone()) {
            return false;
        }
        try {
            return delegate.get().isDone();
        } catch (InterruptedException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        } catch (ExecutionException e) {
            throw new RuntimeException("Error fetching a finished future", e);
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接