Java中的并发请求处理及其限制

7
假设我需要按以下方式处理三种类型的请求:A,B和C:
  • 请求同时处理。
  • 有最多K(<= 3)个请求同时进行处理。
  • 不能同时处理相同类型的请求。

更一般地,类型的数量为N,同时处理的请求数量为K<= N。

您将如何使用java.util.concurrent在Java中实现它?

4个回答

2

您不能同时处理K个请求,否则会违反第二个规则。最大并发请求数量是数字类型。在您的情况下为3。因此,请创建三个队列并将它们附加到三个线程上。这是唯一的方法。Executors.newSingleThreadExecutor 实现了这种技术。

public static void main(String[] args) {
    int N = 2;
    int K = 3;
    List<Executor> executors = new ArrayList<Executor>(N);
    for(int i = 0; i < N; i++){
        executors.add(Executors.newSingleThreadExecutor());
    }
    Map<Type, Executor> typeExecutors = new HashMap<Type, Executor>(K);
    int i = 0;
    for(Type t : Type.values()){
        typeExecutors.put(t, executors.get(i++ % executors.size()));
    }
}

enum Type{
    T1, T2, T3
}

同意,在第一次阅读问题后,我也认为Executors(大小为k的newFixedThreadPool)是最好的选择。然而,只允许某种类型的最多1个作业在任何给定时刻运行的限制使得您的线程+队列方案更好。 - Shivan Dragon
如果K = 2怎么办?更一般地,类型的数量为N且K < N。我将更新问题。 - Michael
因此,创建K个线程并在它们之间分配所有类型的队列。对于K=2,N=3的情况,第一个线程应该以轮询方式轮流处理两个队列,而第二个线程则只监听单个队列。 - Mikhail
我不喜欢轮询 :( - Michael
请使用其中一个BlockingQueue实现。它有一个take()方法,如果队列为空,则会阻塞。 - Mikhail
请问您能否更新您的答案,提供一个适用于N和K < N的一般情况下基于队列的解决方案? - Michael

0

0

你的问题领域可以建模为两个数据结构,我称之为pending(将类型映射到无界队列的任务 - 这是任务等待运行的地方)和running(每种类型最多有一个任务准备好运行或由执行器实际运行)。

K约束必须应用于running:它最多具有K个TypeTask映射。

重点是,您为所有任务处理分配的线程数完全与并发约束处理无关:您的线程池选择应该受到要执行的任务类型(IO / CPU绑定?)等因素的支配,而不是并发约束。

一种实现:

public class Foo {

    enum TaskType { A, B, C }

    class Task {
        TaskType type;
        Runnable runnable;
        volatile boolean running;
    }

    Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>();

    Map<TaskType, Task> running = new HashMap<TaskType, Task>();

    ExecutorService executor = null; // Executor implementation is irrelevant to the problem

    /** Chooses a task of a random type between those not running. */
    TaskType choosePending(){
        Set running_types = running.keySet();
        running_types.removeAll(Arrays.asList(pending.keySet()));
        List shuffled = new ArrayList(running_types);
        Collections.shuffle(shuffled);
        return (TaskType) shuffled.get(0);
    }

    // note that max concurrency != parallelism level (which the executor is responsible for)
    final int MAX_CONCURRENCY = 3;

    void produce(){
        synchronized(running){
            if (running.size() < MAX_CONCURRENCY) {
                synchronized (pending){
                    TaskType t = choosePending();
                    running.put(t, pending.get(t).remove()) ;
                }
            }
        }
    }

    {
        new Thread(new Runnable() {
            public void run() {
                while (true) produce();
            }
        }).start();
    }

    Task chooseRunning(){
         for (Task t : running.values()){
             if (!t.running){
                 return t;
             }
         }
        return null;
    }

    void consume(){
        final Task t;
        synchronized (running){
            t = chooseRunning();
            if (t != null){
                t.running = true;
                executor.execute(new Runnable() {
                    public void run() {
                        t.runnable.run();
                        synchronized (running) {
                            running.remove(t);
                        }
                    }
                });
            }
        }
    }

    {
        new Thread(new Runnable() {
            public void run() {
                while (true) consume();
            }
        }).start();
    }

}

0

请求将被并发处理。

您可以使用Executor服务。

同时最多有K个请求需要并发处理。

在执行器中,设置最大线程数。

不能同时处理相同类型的请求。

您可以考虑为每种请求类型设置不同的锁。只需确保如果线程无法在指定时间内获取请求的锁,则应放弃并进行下一个任务处理。


你能详细说明一下你提出的锁以满足第三个约束条件吗? - Michael

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接