如何以类型友好的方式包装可调用对象?

4

我正在尝试在Java中实现一个工作队列,它限制了每次可以处理的工作量。特别地,它试图保护对外部资源的访问。我的当前方法是使用Semaphore和BlockingQueue,以便我有类似这样的东西:

interface LimitingQueue<V> {
    void put(Callable<V> work);
    Callable<V> tryPoll();
}

它应该像这样运行:
@Test
public void workLimit() throws Exception {
    final int workQueue = 2;
    final LimitingQueue<Void> queue = new LimitingQueue<Void>(workQueue);
    queue.put(new Work()); // Work is a Callable<Void> that just returns null.
    queue.put(new Work());

    // Verify that if we take out one piece of work, we don't get additional work.
    Callable<Void> work = queue.tryPoll();
    assertNotNull(work, "Queue should return work if none outstanding");
    assertNull(queue.tryPoll(), "Queue should not return work if some outstanding");

    // But we do after we complete the work.
    work.call();
    assertNotNull(queue.tryPoll(), "Queue should return work after outstanding work completed");
}
tryPoll() 的实现使用了 Semaphore#tryAcquire,如果成功,就创建一个匿名的 Callable,将 Semaphore#release 调用包装在一个 try/finally 块中,该块围绕着对 work.call() 的调用。
这种方法虽然可行,但有点不尽如人意。因为如果这个类的用户放入的工作是某个特定的 Callable 实现类,则当查看 tryPoll 的结果时,用户无法访问该类。值得注意的是,tryPoll() 返回的是一个 Callable<Void>,而不是一个 Work
有没有一种方法可以实现工作限制效果,并在将提交的工作对象返回给调用者时提供一个可用的引用?(将 LimitingQueue 的类型签名加强成更像 LimitingQueue<R, T extends Callable<R>> 是可以的。)我想不出在不进行此类包装的情况下确保在调用工作项后释放信号量的方法。

不确定您想要什么。您可以展示一下您想要调用的代码,我们会尝试让它编译通过。 - Bohemian
2个回答

3

编辑2: 我已经用一个实现你所需功能的建议替换了原来的内容。如果你希望恢复部分旧信息,请告诉我。

public class MyQueue<T> {

  private Semaphore semaphore;

  public void put(Work<T> w) {
    w.setQueue(this);
  }

  public Work<T> tryPoll() {
    return null;
  }


  public abstract static class Work<T> implements Callable<T> {

    private MyQueue<T> queue;

    private void setQueue(MyQueue<T> queue) {
      if(queue != null) {
        throw new IllegalStateException("Cannot add a Work object to multiple Queues!");
      }
      this.queue = queue;
    }

    @Override
    public final T call() throws Exception {
      try {
        return callImpl();
      } finally {
        queue.semaphore.release();
      }
    }

    protected abstract T callImpl() throws Exception;
  }
}

那么就像这样使用它:
public class Test {

  public static void main(String[] args) {
    MyQueue<Integer> queue = new MyQueue<Integer>();
    MyQueue.Work<Integer> work = new MyQueue.Work<Integer>() {
      @Override
      protected Integer callImpl() {
        return 5;
      }
    };

    queue.put(work);
    MyQueue.Work<Integer> sameWork = queue.tryPoll();
  }
}

是的,每个队列中只会有一个类型,比如说 Work。我想要包装 Work 的原因是为了让类能够保证在 Work#call() 之后释放信号量。 - Emil Sit
我已经用一个初始实现替换了我的答案。这应该是静态类型安全的。 - user545680
唯一的问题是,一个“Work”对象可以传递一个与其放置的“Queue”不匹配的“Queue”,即它将释放错误的“Semaphore”。如果您不确保在Javadoc中清楚地使用它,这将会导致一些严重的头痛。 - user545680
好的,我又更新了一下。现在队列已经为工作对象设置了队列,尝试将工作放入第二个队列将抛出异常。我还没有使它线程安全地同时将工作放入队列,但这就留给你了。^_^ - user545680
所以 - 我解决了信号量问题,现在返回值不是某个包装类,而是一个用户创建的包装类。这意味着用户可以执行 instanceof 检查来向下转换为他们的 WorkObjectWithSuperAwesomeMethods 类,尽管这仍然破坏了使用继承的思想。 - user545680
显示剩余5条评论

0

我想可以使用一个大小限制为我想要的正在进行的工作量的 newFixedThreadPool,但是这个队列试图管理跨越许多不同资源的广泛工作池中的工作。我想通过线程池限制总并行性(因此不想为每种资源类型使用单独的线程池),并且还要限制外部资源的个别并行性(由信号量管理)。这有意义吗? - Emil Sit

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接