都是什么?在IT技术中有何不同?

17

我有一个生产者和许多消费者。

  • 生产者速度快,生成了许多结果
  • 具有相同值的令牌需要依次处理
  • 具有不同值的令牌必须并行处理
  • 创建新的Runnables将非常昂贵,并且生产代码可以处理100k个令牌(为了创建Runnable,我必须向构造函数传递一些复杂的构建对象)

我能否用更简单的算法实现相同的结果?在重入锁同步块中嵌套似乎有点不自然。您可能会注意到任何竞争条件吗?

更新:我找到的第二个解决方案是使用3个集合。一个用于缓存生产者的结果,第二个是阻塞队列,第三个是使用列表跟踪正在进行的任务。再次有点太复杂了。

我的代码版本:

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main1 {
    static class Token {
        private int order;
        private String value;
        Token() {

        }
        Token(int o, String v) {
            order = o;
            value = v;
        }

        int getOrder() {
            return order;
        }

        String getValue() {
            return value;
        }
    }

    private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
    private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    private final static ReentrantLock reentrantLock = new ReentrantLock();
    private final static Token STOP_TOKEN = new Token();
    private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());

    public static void main(String[] args) {
        ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        producerExecutor.submit(new Runnable() {
            public void run() {
                Random random = new Random();
                    try {
                        for (int i = 1; i <= 100; i++) {
                            Token token = new Token(i, String.valueOf(random.nextInt(1)));

                            queue.put(token);
                        }

                        queue.put(STOP_TOKEN);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        });

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
        for(int i=1; i<=10;i++) {

            // creating to many runnable would be inefficient because of this complex not thread safe object
            final Object dependecy = new Object(); //new ComplexDependecy()
            consumerExecutor.submit(new Runnable() {
                public void run() {
                    while(true) {
                        try {
                            //not in order


                            Token token = queue.take();
                            if (token == STOP_TOKEN) {
                                queue.add(STOP_TOKEN);
                                return;
                            }


                            System.out.println("Task start" + Thread.currentThread().getId() + " order "  + token.getOrder());

                            Random random = new Random();
                            Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
                            lockList.remove(token.getValue());

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }});

    }
}}

4
必须并行处理。强制让任何两个或更多的事情同时发生是困难的。不同的线程被允许并行执行任务,但在Java中没有担保这些任务一定会并行执行。 - Solomon Slow
10个回答

6

根据你的代码性质,保证具有相同值的令牌按顺序处理的唯一方法是等待STOP_TOKEN到达。

您需要单生产者-单消费者设置,其中消费者通过其值收集和排序令牌(例如放入Multimap中)。

只有这样,您才知道哪些令牌可以按顺序处理,哪些可以并行处理。

无论如何,我建议您查看LMAX Disruptor,它提供了一种非常有效的在线程间共享数据的方法。

与Executors不同,它没有同步开销,因为它是无锁的(这可能会给您带来良好的性能优势,具体取决于您处理数据的方式)。

使用两个Disruptors的解决方案

// single thread for processing as there will be only on consumer
Disruptor<InEvent> inboundDisruptor = new Disruptor<>(InEvent::new, 32, Executors.newSingleThreadExecutor());

// outbound disruptor that uses 3 threads for event processing
Disruptor<OutEvent> outboundDisruptor = new Disruptor<>(OutEvent::new, 32, Executors.newFixedThreadPool(3));

inboundDisruptor.handleEventsWith(new InEventHandler(outboundDisruptor));

// setup 3 event handlers, doing round robin consuming, effectively processing OutEvents in 3 threads
outboundDisruptor.handleEventsWith(new OutEventHandler(0, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(1, 3, new Object()));
outboundDisruptor.handleEventsWith(new OutEventHandler(2, 3, new Object()));

inboundDisruptor.start();
outboundDisruptor.start();

// publisher code
for (int i = 0; i < 10; i++) {
    inboundDisruptor.publishEvent(InEventTranslator.INSTANCE, new Token());
}

入站disruptor上的事件处理程序只收集传入的令牌。当接收到STOP令牌时,它会将一系列令牌发布到出站disruptor以进行进一步处理。
public class InEventHandler implements EventHandler<InEvent> {

    private ListMultimap<String, Token> tokensByValue = ArrayListMultimap.create();
    private Disruptor<OutEvent> outboundDisruptor;

    public InEventHandler(Disruptor<OutEvent> outboundDisruptor) {
        this.outboundDisruptor = outboundDisruptor;
    }

    @Override
    public void onEvent(InEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (event.token == STOP_TOKEN) {
            // publish indexed tokens to outbound disruptor for parallel processing
            tokensByValue.asMap().entrySet().stream().forEach(entry -> outboundDisruptor.publishEvent(OutEventTranslator.INSTANCE, entry.getValue()));
        } else {
            tokensByValue.put(event.token.value, event.token);
        }
    }
}

出站事件处理程序按相同值的令牌顺序逐个处理:

public class OutEventHandler implements EventHandler<OutEvent> {

    private final long order;
    private final long allHandlersCount;
    private Object yourComplexDependency;

    public OutEventHandler(long order, long allHandlersCount, Object yourComplexDependency) {
        this.order = order;
        this.allHandlersCount = allHandlersCount;
        this.yourComplexDependency = yourComplexDependency;
    }

    @Override
    public void onEvent(OutEvent event, long sequence, boolean endOfBatch) throws Exception {
        if (sequence % allHandlersCount != order ) {
            // round robin, do not consume every event to allow parallel processing
            return;
        }

        for (Token token : event.tokensToProcessSerially) {
            // do procesing of the token using your complex class
        }

    }
}

所需的其余基础设施(用途在Disruptor文档中描述):

public class InEventTranslator implements EventTranslatorOneArg<InEvent, Token> {

    public static final InEventTranslator INSTANCE = new InEventTranslator();

    @Override
    public void translateTo(InEvent event, long sequence, Token arg0) {
        event.token = arg0;
    }

}

public class OutEventTranslator implements EventTranslatorOneArg<OutEvent, Collection<Token>> {

    public static final OutEventTranslator INSTANCE = new OutEventTranslator();

    @Override
    public void translateTo(OutEvent event, long sequence, Collection<Token> tokens) {
        event.tokensToProcessSerially = tokens;
    }
}


public class InEvent {

    // Note that no synchronization is used here,
    // even though the field is used among multiple threads.
    // Memory barrier used by Disruptor guarantee changes are visible.
    public Token token;
}

public class OutEvent {
    // ... again, no locks.
    public Collection<Token> tokensToProcessSerially;

}

public class Token {
    String value;

}

我宁愿不使用外部库。 - johnlemon

6

您可以预先创建一组Runnables,它们将根据其顺序值将传入的任务(令牌)放入队列中。

正如评论中指出的那样,不能保证具有不同值的令牌始终并行执行(毕竟,您至少受到盒子中物理内核数量的限制)。但是,保证具有相同顺序的令牌将按照到达的顺序执行。

示例代码:

/**
 * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
 */
public class TasksOrderingExecutor {

    public interface Task extends Runnable {
        /**
         * @return ordering value which will be used to sequence tasks with the same value.<br>
         * Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
         */
        String getOrder();
    }

    private static class Worker implements Runnable {

        private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Task task) {
            tasks.add(task);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Task task = tasks.take();
                    task.run();
                } catch (InterruptedException ie) {
                    // perhaps, handle somehow
                }
            }
        }
    }

    private final Worker[] workers;
    private final ExecutorService executorService;

    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
        executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        workers = new Worker[queuesNr];
        for (int i = 0; i < queuesNr; i++) {
            Worker worker = new Worker();
            executorService.submit(worker);
            workers[i] = worker;
        }
    }

    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.schedule(task);
    }

    public void stop() {
        for (Worker w : workers) w.stop();
        executorService.shutdown();
    }

    private Worker getWorker(Task task) {
        return workers[task.getOrder().hashCode() % workers.length];
    }
}

生产者比消费者快,你的内存中会有很多任务。此外,负载可能不会在工作进程之间均匀分配。 - johnlemon
@danip 关于速度持续差异的问题 - 那么您需要切换到有界阻塞队列,并接受由于阻塞而产生的一些延迟。为了提高吞吐量,您需要在多个盒子之间分配负载。 - Victor Sorokin
我认为使用多个框会使问题变得过于复杂。我可以利用您提出的概念按顺序分配任务,但我必须使它统一。 - johnlemon

5
如果您有许多不同的令牌,那么最简单的解决方案是创建一些单线程执行器(大约为核心数的2倍),然后通过其令牌的哈希确定将每个任务分配给哪个执行器。
这样,所有具有相同令牌的任务都将进入同一个执行器并按顺序执行,因为每个执行器只有一个线程。
如果您对调度公平性有一些未明确说明的要求,那么很容易通过在分配之前使生产者线程排队其请求(或阻塞),直到每个执行器未完成的请求少于10个来避免任何重大不平衡。

这是一个我会考虑的解决方案。生产者分发系统可能会使解决方案比我的示例更复杂,但也许我可以忽略它。 - johnlemon
我认为你的回答最接近要点。该算法将把令牌放入不同的消费者中并跟踪它们。对于每个新令牌,下一个消费者将按轮换使用。如果一个令牌已经被处理过,则必须由同一消费者处理。 - johnlemon
在大多数这样的情况下,您可以只使用(token.hashCode()&0x7FFFFFFF)%number_of_consumers来决定放置每个元素的位置,而无需跟踪任何内容。 - Matt Timmermans
有没有可能会有一些消费者得不到任何东西? - johnlemon
如果您有许多不同的令牌,那么这是极不可能的。但这并不重要,因为您拥有比处理器核心更多的消费者,所以您的所有CPU仍将保持繁忙状态。 - Matt Timmermans

4
以下解决方案仅使用单个Map,生产者和消费者使用该Map按顺序处理每个订单号的订单,同时并行处理不同的订单号。以下是代码:
public class Main {

    private static final int NUMBER_OF_CONSUMER_THREADS = 10;
    private static volatile int sync = 0;

    public static void main(String[] args) {
        final ConcurrentHashMap<String,Controller> queues = new ConcurrentHashMap<String, Controller>();
        final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CONSUMER_THREADS);
        final AtomicBoolean done = new AtomicBoolean(false);

        // Create a Producer
        new Thread() {
            {
                this.setDaemon(true);
                this.setName("Producer");
                this.start();
            }

            public void run() {
                Random rand = new Random();

                for(int i =0 ; i < 1000 ; i++) {
                    int order = rand.nextInt(20);
                    String key = String.valueOf(order);
                    String value = String.valueOf(rand.nextInt());
                    Controller controller = queues.get(key);
                    if (controller == null) {
                        controller = new Controller();
                        queues.put(key, controller);
                    }
                    controller.add(new Token(order, value));
                    Main.sync++;
                }

                done.set(true);
            }
        };

        while (queues.size() < 10) {
            try {
                // Allow the producer to generate several entries that need to
                // be processed.
                Thread.sleep(5000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
        }

        // System.out.println(queues);

        // Create the Consumers
        ExecutorService consumers = Executors.newFixedThreadPool(NUMBER_OF_CONSUMER_THREADS);
        for(int i = 0 ; i < NUMBER_OF_CONSUMER_THREADS ; i++) {
            consumers.submit(new Runnable() {
                private Random rand = new Random();

                public void run() {
                    String name = Thread.currentThread().getName();
                    try {
                        boolean one_last_time = false;
                        while (true) {
                            for (Map.Entry<String, Controller> entry : queues.entrySet()) {
                                Controller controller = entry.getValue();
                                if (controller.lock(this)) {
                                    ConcurrentLinkedQueue<Token> list = controller.getList();
                                    Token token;
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(rand.nextInt(200));
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    int last = Main.sync;
                                    queues.remove(entry.getKey());
                                    while(done.get() == false && last == Main.sync) {
                                        // yield until the producer has added at least another entry
                                        Thread.yield();
                                    }
                                    // Purge any new entries added
                                    while ((token = list.poll()) != null) {
                                        try {
                                            System.out.println(name + " processing order: " + token.getOrder()
                                                    + " value: " + token.getValue());
                                            Thread.sleep(200);
                                        } catch (InterruptedException e) {
                                        }
                                    }
                                    controller.unlock(this);
                                }
                            }
                            if (one_last_time) {
                                return;
                            }
                            if (done.get()) {
                                one_last_time = true;
                            }
                        }
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumers.shutdown();
        System.out.println("Exiting.. remaining number of entries: " + queues.size());
    }

}

请注意,Main类包含一个queues实例,它是一个Map。映射键是您想要由消费者按顺序处理的订单ID。值是一个Controller类,其中将包含与该订单ID相关联的所有订单。
生产者将生成订单并将订单(Token)添加到其关联的Controller中。消费者将迭代队列映射值,并调用Controller锁定方法以确定是否可以处理特定订单ID的订单。如果锁定返回false,则会检查下一个Controller实例。如果锁定返回true,则会处理所有订单,然后检查下一个Controller。
更新:添加了同步整数,用于保证从队列映射中删除Controller实例时,其所有条目都将被消耗。在消费者代码中存在逻辑错误,其中unlock方法被调用得太早。
Token类与您在此处发布的类似。
class Token {
    private int order;
    private String value;

    Token(int order, String value) {
        this.order = order;
        this.value = value;
    }

    int getOrder() {
        return order;
    }

    String getValue() {
        return value;
    }

    @Override
    public String toString() {
        return "Token [order=" + order + ", value=" + value + "]\n";
    }
}

下面的Controller类用于确保线程池中只有一个线程处理订单。 lock/unlock方法用于确定哪个线程将被允许处理订单。
class Controller {

    private ConcurrentLinkedQueue<Token> tokens = new ConcurrentLinkedQueue<Token>();
    private ReentrantLock lock = new ReentrantLock();
    private Runnable current = null;

    void add(Token token) {
        tokens.add(token);
    }

    public ConcurrentLinkedQueue<Token> getList() {
        return tokens;
    }

    public void unlock(Runnable runnable) {
        lock.lock();
        try {
            if (current == runnable) {
                current = null;
            }
        } finally {
            lock.unlock();
        }
    }

    public boolean lock(Runnable runnable) {
        lock.lock();
        try {
            if (current == null) {
                current = runnable;
            }
        } finally {
            lock.unlock();
        }
        return current == runnable;
    }

    @Override
    public String toString() {
        return "Controller [tokens=" + tokens + "]";
    }

}

关于实现的其他信息。它使用CountDownLatch来确保在进程退出之前处理所有产生的订单。done变量就像您的STOP_TOKEN变量一样。

实现中确实存在一个问题,您需要解决这个问题。问题在于当所有订单都被处理完毕时,它不会清除订单id的控制器。这将导致线程池中的线程被分配到不含任何订单的控制器中。这将浪费CPU周期,而这些周期可以用来执行其他任务。


你正在内存中保存所有生成的结果。这是我需要避免的事情。 - johnlemon
poll方法将从链接队列中删除并获取条目,结果不会保存在内存中。 - Claudio Corsi
生产者比所有的消费者都要快,因此队列中的结果会迅速增加。 - johnlemon

4

你需要做的是确保具有相同值的令牌不会同时被处理,而无需过于繁琐。你提供的代码太混乱了,难以理解你的意图(代码无法编译,并且存在许多未使用的变量、锁和映射,这些都是创建但从未使用过的)。看起来你在过度思考。你只需要一个队列和一个映射就可以了。 像这样:

   class Consumer implements Runnable {
     ConcurrentHashMap<String, Token> inProcess;
     BlockingQueue<Token> queue;

     public void run() {
        Token token = null;
        while ((token = queue.take()) != null) {
           if(inProcess.putIfAbsent(token.getValue(), token) != null) {
              queue.put(token);
              continue;
           }
           processToken(token);
           inProcess.remove(token.getValue());
        }
     }
   }

代码在Java 8下编译正常。我不确定您遇到了什么编译问题。未使用的变量只是为了解释问题的复杂性。 - johnlemon
"具有相同值的令牌需要按顺序处理,因此您不能将一个令牌放在队列末尾,因为顺序将不会被保留。如果运行我的示例,您将看到正确的输出。" - johnlemon
@danip 当然,你创建的未使用变量越多,问题看起来就越复杂。你不应该创建变量“来展示复杂性”,每个变量都应该有自己清晰明确的目的。尝试一下,你会发现问题并没有你想象中那么复杂。 - Dima
@danip,我不确定你在发布队列方面遇到了什么问题,但如果你所说的“顺序”是指“按顺序”,那么你的整个设计需要重新调整,这不是正确的方法:让每个消费者查看自己的队列,并让生产者通过值的哈希码或某些类似的函数将令牌分配到队列中。对于未来,请记住:“顺序”意味着“不并行”,不一定按顺序。 - Dima
顺序通常遵循数字或字母顺序 :) - johnlemon
不,“sequential”意思是“按顺序”,或者“一个接一个地”。无论如何,你不需要说服我,那只是给你在描述技术问题时更准确地表达的建议,以便人们理解你正在描述什么。如果你在描述中留下一些需要他们“猜测”的东西,你可以肯定有些猜测会与你期望的不同。如果你想要按顺序完成某件事,只需说“按顺序”,这比“顺序执行”更短,更能描述要求。 - Dima

3

我不完全确定我是否理解了问题,但我会尝试提供一种算法。

这里有以下角色:

  • 一个任务队列
  • 一个空闲执行器池
  • 一个正在处理中的令牌集合
  • 一个控制器

然后,

  • 最初,所有执行器都是可用的,集合为空

  • 控制器选择一个可用的执行器并遍历队列查找一个具有未在正在处理集合中的令牌任务

    • 令牌添加到正在处理集合
    • 分配执行器来处理任务
    • 回到队列开头
  • 执行器完成处理时,它将从集合中删除令牌并将自己重新添加到


唯一的问题是我需要一个阻塞队列,因为生产者非常快。我能找到的唯一实现是ArrayBlockingQueue,但它是FIFO,所以“通过队列”部分不容易实现。 - johnlemon
这并不是一个队列,因为你需要在尾部添加元素的同时移除无序的元素。链表可能是更合适的数据结构。至于阻塞,对于这种复杂度的项目,将现有的集合类进行包装应该是完全可行的。 - Miserable Variable

3
有相同值的令牌需要按顺序处理,确保任何两个操作按顺序进行的方法是在同一个线程中执行它们。你可以拥有多个工作线程,并拥有一个 Map。每当您获得一个未见过的令牌时,您将随机选择一个线程,并将令牌和线程输入到 Map 中。从那时起,您将使用相同的线程来执行与该令牌相关联的任务。
创建新的 Runnable 对象可能非常昂贵。Runnable 是一个接口。创建实现 Runnable 接口的新对象不会比创建任何其他类型的对象更昂贵。

这段代码是一个抽象的例子。为了创建实现Runnable接口的特定对象,我需要将一些复杂的对象传递给构造函数(也许您熟悉JAXB中的Marshaller)。在这种情况下,我应该采用什么策略? - johnlemon
其次,“我会随机选择一个线程”,我能否使用执行器来实现这个功能,还是必须直接管理线程? - johnlemon
@danip 我不知道你的第一个问题,但是关于执行器,是的。在我回答中提到“Thread”的地方,你可以将其更改为“ExecutorService”,然后通过调用Executors.newSingleThreadExecutor()来创建每个执行器。 - Solomon Slow

3
也许我误解了什么。但是似乎最初将具有相同值的令牌与具有不同值的令牌过滤到两个不同的队列中会更容易一些。
然后针对顺序处理使用流(Stream)中的map或foreach方法。其余部分可以简单地使用并行流版本。
如果您在生产环境中的令牌是惰性生成的,并且每次只获取一个,则可以制作某种筛选器来将它们分配到两个不同的队列中。
如果您能够使用Streams实现,我建议这样做,因为它们简单、易于使用且速度快!
我举了一个简短的例子来说明我的意思。在这种情况下,数字令牌有点人工构造,但那不重要。此外,这两个流都是在主线程上启动的,这可能也不理想。 https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html
public static void main(String args[]) {
    ArrayList<Token> sameValues = new ArrayList<Token>();
    ArrayList<Token> distinctValues = new ArrayList<Token>();
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        int next = random.nextInt(100);
        Token n = new Token(i, String.valueOf(next));
        if (next == i) {
            sameValues.add(n);
        } else {
            distinctValues.add(n);
        }
    }       
    distinctValues.stream().parallel().forEach(token -> System.out.println("Distinct: " + token.value));
    sameValues.stream().forEach(token -> System.out.println("Same: " + token.value));       
}

3

一种做法是为序列处理和并行处理分别设置一个执行器。我们还需要一个单线程的管理服务,它将决定哪个服务令牌需要提交进行处理。 //队列将由两个线程共享。包含生产者生成的令牌。
BlockingQueue tokenList = new ArrayBlockingQueue(10);

    private void startProcess() {
    ExecutorService producer = Executors.newSingleThreadExecutor();
    final ExecutorService consumerForSequence = Executors
            .newSingleThreadExecutor();
    final ExecutorService consumerForParallel = Executors.newFixedThreadPool(10);
    ExecutorService manager = Executors.newSingleThreadExecutor();

    producer.submit(new Producer(tokenList));

    manager.submit(new Runnable() {

        public void run() {
            try {
                while (true) {
                    Token t = tokenList.take();
                    System.out.println("consumed- " + t.orderid
                            + " element");

                    if (t.orderid % 7 == 0) { // any condition to check for sequence processing

                        consumerForSequence.submit(new ConsumerForSequenceProcess(t));

                    } else {

                        ConsumerForParallel.submit(new ConsumerForParallelProcess(t));

                    }
                }
            }

            catch (InterruptedException e) { // TODO Auto-generated catch
                // block
                e.printStackTrace();
            }

        }
    });
}

你的代码将序列处理限制为单个消费者。多个消费者应该能够按顺序处理令牌。 - johnlemon
是的,但这可以处理。我们需要使用一个阻塞队列来保存需要按顺序处理的令牌,然后我们可以在该队列上使用一组消费者而不是一个消费者来拾取令牌进行处理。如果有任何建议,请纠正我。 - yuvraj

2
我认为这个任务背后有一个更基本的设计问题,但好吧。我无法从您的问题描述中判断您是想要按顺序执行还是只想让由单个标记描述的任务操作是原子性/事务性的。下面我提出的建议更像是解决这个问题的“临时补救”而不是真正的解决方案。
对于真正的“有序执行”情况,我提出了一种基于队列代理的解决方案,它可以排序输出:
  1. 定义一个实现 Queue 接口的类,提供一个生成代理队列的工厂方法,这些代理队列由一个单一的队列对象表示给生产者使用;该工厂方法还应注册这些代理队列对象。将元素添加到输入队列时,如果它与任何输出队列中的元素匹配,则应将其直接添加到相应的输出队列中。否则将其添加到任何一个(最短的)输出队列中(要高效地实现此检查)。或者(稍微更好一点):当任何一个输出队列为空时再执行上述操作。

  2. 为每个可运行的消费者定义一个存储单个 Queue 接口的字段(而不是访问单个对象)。通过以上定义的工厂方法初始化此字段。

对于事务性情况,我认为更容易的方法是跨越比你的核心更多的线程(使用统计数据来计算),并在较低(对象)级别上实现阻塞机制。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接