使用AtomicBoolean实现Java中的循环调度算法

7
我希望在向外部系统发送请求时实现严格的轮询调度。有两个外部系统服务器,第一个请求应该发送到“System1”,第二个请求必须发送到“System2”,下一个请求再次发送到“System1”以此类推。
由于我只有两个服务器可以发送请求,并且希望获得最大的性能而不会出现任何阻塞和上下文切换,因此我选择了 AtomicBoolean,因为它利用了 CAS 操作。
我的实现类如下:
1. RoundRobinTest.java
package com.concurrency;

import java.util.Iterator;

public class RoundRobinTest 
{
    public static void main(String[] args) 
    {
        for (int i = 0; i < 500; i++) 
        {
            new Thread(new RoundRobinLogic()).start();
        }
        try 
        {
            // Giving a few seconds for the threads to complete
            Thread.currentThread().sleep(2000);
            Iterator<String> output = RoundRobinLogic.output.iterator();
            int i=0;
            while (output.hasNext()) 
            {
                System.out.println(i+++":"+output.next());
                // Sleeping after each out.print 
                Thread.currentThread().sleep(20);
            }
        } 
        catch (Exception ex) 
        {
            // do nothing
        }
    }

}

2. RoundRobinLogic.java(带有静态AtomicBoolean对象的类)

package com.concurrency;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;

public class RoundRobinLogic implements Runnable 
{
    private static AtomicBoolean bool = new AtomicBoolean(true);

    public static Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run() 
    {
        if(bool.getAndSet(false))
        {
            // Sending the request to first system
            output.add("Request to System1");
        }
        else if(!bool.getAndSet(true))
        {
            // Sending the request to first system
            output.add("Request to System2");
        }       
    }

}

输出:


......................
314:Request to System1
315:Request to System2
316:Request to System1
317:Request to System2
318:Request to System1
319:Request to System1
320:Request to System2
321:Request to System2
322:Request to System1
323:Request to System2
324:Request to System1
325:Request to System2
......................

请求318和319已经发送到同一台服务器,此时AtomicBoolean失效。对于我的应用程序,1000-2000个线程可能同时访问共享对象。 从《Java并发实践》中,我看到了以下内容。

在高争用级别下,锁定倾向于优于原子变量,但在更现实的争用级别下,原子变量优于锁定。这是因为锁定通过暂停线程来响应争用,减少CPU使用率和共享内存总线上的同步流量。 对于低到中等争用,原子可扩展性更好;对于高争用,锁提供更好的避免竞争的能力。(基于CAS的算法也比基于锁的算法在单CPU系统上表现更好,因为除非线程在读取修改写入操作的中间被抢占,否则CAS始终成功。)

现在我有以下问题。

  1. 是否有其他有效的非阻塞方式来实现轮流发送请求?
  2. 在严重争用的情况下,AtomicBoolean是否会失败?我的理解是,由于重度争用,性能/吞吐量可能会下降。但在上面的示例中,AtomicBoolean失败了。为什么?

1
您同时运行了1k-2k个线程?您在多少处理器上运行它们? - Sean Bright
3
尽管boolqueue的状态彼此依赖,但它们的访问并没有同步。 - Mick Mnemonic
1
你的 run 方法没有利用 CAS。考虑这个问题:由于时间问题,你的 run 方法中的两次对 output.add 的调用都有可能不会被执行。 - Sean Bright
也许你可以使用这个方法,在原子长整型上加1,然后使用取余运算符计算模2。 - 1010
@SeanBright:我不明白输出.add会受到时间问题的影响。你能解释一下吗? - Albin
显示剩余3条评论
3个回答

11

在回答John的问题时,RoundRobinLogic的一个更加干净、或许略微更高效的实现方法是使用AtomicIntegerAtomicLong。这消除了将AtomicBoolean的当前值与新值进行比较的需要:

class RoundRobinLogic implements Runnable
{
    private static final AtomicInteger systemIndex = new AtomicInteger(1);

    public static final Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run()
    {
        if (systemIndex.incrementAndGet() % 2 == 0) {
            // Sending the request to first system
            output.add("Request to System1");
        } else {
            // Sending the request to second system
            output.add("Request to System2");
        }
    }
}

这样做可以很容易地将其扩展到其他系统:

class RemoteSystem
{
    private final String name;

    RemoteSystem(String name)
    {
        this.name = name;
    }

    public String name()
    {
        return name;
    }
}

class RoundRobinLogic implements Runnable
{
    private static final AtomicInteger systemIndex = new AtomicInteger(1);

    private static final RemoteSystem[] systems = new RemoteSystem[] {
        new RemoteSystem("System1"),
        new RemoteSystem("System2"),
        new RemoteSystem("System3"),
        new RemoteSystem("System4"),
    };

    public static final Queue<String> output = new ConcurrentLinkedDeque<>();

    @Override
    public void run()
    {
        RemoteSystem system = systems[systemIndex.incrementAndGet() % systems.length];

        // Sending the request to right system
        output.add("Request to " + system.name());
    }
}

@1010 他们应该是这样的,这就是为什么 OP 不能得到他想要的精确输出而不阻塞。我希望 OP 已经将代码作为示例上传了。 - John Vint
好的...如果发送到system1的请求比发送到system2的请求长,会发生什么?现在它只是一个固定的字符串,但实际上它将打开一个套接字,执行一些I/O等操作。这里介绍的方法只确保“大约一半”的请求发送到这两个系统中的每一个。 - Sean Bright
在实际情况下,我不需要“output”队列。所以你的解决方案可行。我会考虑这个解决方案以及@JohnVint的解决方案。 - Albin
1
@Albin,无论你调用的是队列还是外部系统,都不重要。除非你同步这两个方法调用,否则无法保证服务调用之间的完美交替——我认为这正是你提出问题的全部意义所在。 - Mick Mnemonic
1
正确。没有“完美交替”的保证。上面的代码唯一保证的是,在给定n个外部系统和x个请求的情况下,每个系统将处理x / n个请求。正如我在您的答案中所评论的那样,使用内置锁包装对外部系统的调用会显著降低吞吐量。 - Sean Bright
显示剩余4条评论

4

假设您不是使用一个Queue而是连接到一个实际系统的API。我看到的问题与以下相关:

    if(bool.getAndSet(false))
    {
        // Sending the request to first system
        output.add("Request to System1");
    }
    else if(!bool.getAndSet(true))
    {
        // Sending the request to second system
        output.add("Request to System2");
    }     

如果两个条件都失败怎么办?这怎么可能发生呢?假设进入第一个if时布尔值为true。然后你尝试将其设置为false,但另一个线程比你先完成了,所以你看到的是false。然后你尝试else if。现在如果当你到达else if时它是false,但另一个线程将其设置为true呢?在这种情况下,两次尝试都会失败。
我会重构它的样子:
while(true){
  boolean current = bool.get();
  if(bool.compareAndSet(current, !current){
     if(current){ 
        //send request to first system
     } else {
        //send request to second system
     }
     return;
  }
}

正如Sean Bright所提到的,因为您正在添加到队列中,即使您像上面那样实现,您仍然可能会看到一些值的顺序不正确,因为队列本身并不是与AtomicBoolean同步的一部分。


1
我已经开始着手处理我的答案,但是你的回答已经涵盖了大部分内容。我认为重要的是要提到,添加到队列中的结果不一定会按照OP想要看到的1、2、1、2等顺序排列。要实现这一点需要更多的同步。 - Sean Bright
2
我认为current!current返回的值不是原子操作的一部分。 - 1010
@1010 你为什么这样认为? - John Vint
1
@SeanBright 那是个很好的观点。1010应该把它作为一个答案添加进去。我不会篡改他的回答。 - John Vint
1
@Albin 你说得对。从概率的角度来看,与SeanBright的解决方案相比,你更有可能在这里看到竞争失败。我认为他的方法更好,说实话。话虽如此,我仍然保留我的答案,以说明你的竞争条件所在之处。 - John Vint
显示剩余5条评论

1

由于您的要求基本上是:实现一个原子操作,该操作

  1. 评估和翻转布尔值(或在通用n个服务器情况下评估模数并增加计数器)并且
  2. 根据步骤1的结果将条目插入队列,

您无法通过单独使步骤1和2线程安全来实现这一点;您必须将步骤1和2一起同步

这是一个简单的实现,应该可以工作:

import java.util.LinkedList;
import java.util.Queue;

public class RoundRobinLogic implements Runnable 
{
    private static boolean bool = true;
    public static final Queue<String> OUTPUT = new LinkedList<String>();
    private static final Object LOCK = new Object();

    @Override
    public void run() {
        synchronized (LOCK) {
            OUTPUT.add(bool ? "Request to System1" : "Request to System2");
            bool = !bool;
        }
    }
}

关于您的问题:

  1. 如果需要将两个高于处理器级别的操作同步,那么无法避免阻塞。java.util.concurrent.atomic中的类使用机器级原子指令,这就是为什么使用这些类的代码(通常取决于平台)不需要阻塞。
  2. 在您的实现中,AtomicBoolean并没有失败。相反,在读取布尔值和向队列添加元素之间存在竞争条件。

虽然这个解决方案可以满足 output 中的 String 以交替顺序出现的需求,但使用所有线程共享的内部锁将会引入更多的竞争。用 OUTPUT.add(someHttpClient.makeRequest(...)) 替换 OUTPUT.add(bool...),事情会很快失控。将 OUTPUT 设为 Queue<Future<String>> 也是一种选择。 - Sean Bright
这怎么是人为的要求呢?OP要求“严格轮询”。他最初的解决方案似乎在绝大多数情况下都正确地进行了修改,所以如果严格性不是真正的要求,我就不明白问题在哪里了。 - Mick Mnemonic
OP 已经多次表明 output.add() 将被替换为调用外部系统。我不知道如何更清楚地说明这一点,但如果你的 OUTPUT.add 被一个外部系统调用所替代,每个其他线程将会在内部锁上阻塞,这意味着仅有一个请求可以同时发生。所以是的,你的代码将在某种程度上“工作”,即 Queue 将按正确顺序排列,但这并不是 OP 想要的。@Albin,请随时加入讨论。 - Sean Bright

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接