Java中线程安全的循环缓冲区

27

考虑几个并行运行的Web服务器实例。每个服务器都持有对单个共享“状态管理器”的引用,其作用是保留所有服务器的最近N个请求。

例如(N=3):

Server a: "Request id = ABCD"        Status keeper=["ABCD"]
Server b: "Request id = XYZZ"        Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"        Status keeper=["ABCD", "XYZZ", "1234"]
Server b: "Request id = FOO"         Status keeper=["XYZZ", "1234", "FOO"]
Server a: "Request id = BAR"         Status keeper=["1234", "FOO", "BAR"]

在任何时候,"状态管理器"可能会被一个监控应用程序调用,该应用程序读取这些最后的N个请求以获取SLA报告。

在Java中实现此生产者-消费者方案的最佳方式是什么,如何使Web服务器优先于SLA报告?

CircularFifoBuffer似乎是容纳请求的适当数据结构,但我不确定实现有效并发的最佳方法是什么。


定义“更高优先级”。如果报告已经开始读取缓冲区,那么它应该中断并重新开始,如果有人想要写入该缓冲区。这是否会导致饥饿? - Emil Vikström
阅读操作非常稀少,可能每隔几分钟才会进行一次。写入操作非常频繁,可能高峰时每秒钟会有数百次调用。我可以容忍偶尔的不一致。 - Adam Matan
@Adam 通常 N 的规模有多大?以 N=3 为例,我不会采用中央“状态管理器”服务器方法,因为发送到它的数据中有99%都是被丢弃的。 - fishinear
在1,000和10,000之间。 - Adam Matan
考虑到这些较低的数字,我会选择一个同步的CircularFifoBuffer,就像@maydeTo建议的那样。你不会因此引入任何显著的瓶颈。 - fishinear
显示剩余2条评论
8个回答

22
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer());

2
只要初始化代码不会出现竞争,就无所谓了。 - MahdeTo
1
BufferUtils是从哪里来的?我尝试在gradle文件中使用来自Apache的"compile 'org.apache.commons:commons-collections4:4.1'",但它不在那里... - android developer
2
@androiddeveloper 在Apache commons collections4中,CircularFifoBuffer已被替换为CircularFifoQueue,可以通过使用QueueUtils进行包装来实现同步。 - Sometimes_Confused
@androiddeveloper 有一种解决并发问题的方法。与(较新的)JDK集合一样,基本队列集合为了提高性能而未同步。如果需要处理并发(即线程安全)的集合,则应使用适当的同步包装器来包装集合;在CircularFifoQueue的情况下,请参见QueueUtils.synchronizedQueue(Queue<E> queue)(正如我之前的评论所述)。 - Sometimes_Confused
有时候使用同步解决方案并不适合并发处理,它只能确保线程安全。并发是指多个线程可以同时执行“有问题”的操作,而无需等待其他线程完成。 - android developer
显示剩余3条评论

7

这是一个无锁环形缓冲区的实现。它实现了一个固定大小的缓冲区 - 没有FIFO功能。我建议您为每个服务器存储一个请求的Collection。这样你的报告可以进行过滤,而不是让你的数据结构进行过滤。

/**
 * Container
 * ---------
 * 
 * A lock-free container that offers a close-to O(1) add/remove performance.
 * 
 */
public class Container<T> implements Iterable<T> {

  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
  // TESTING {
  AtomicLong totalAdded = new AtomicLong(0);
  AtomicLong totalFreed = new AtomicLong(0);
  AtomicLong totalSkipped = new AtomicLong(0);

  private void resetStats() {
    totalAdded.set(0);
    totalFreed.set(0);
    totalSkipped.set(0);
  }
  // TESTING }

  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }

  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }

  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    totalAdded.incrementAndGet();
    return getFree().attach(element);
  }

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    // Keep count of skipped.
    totalSkipped.addAndGet(skipped);
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }

  // Mark it free.
  public void remove(Node<T> it, T element) {
    totalFreed.incrementAndGet();
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }

  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {

    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;

    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }

    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }

    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }

    public T get () {
      return element;
    }

    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }

  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }

  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.

    Node<T> it;
    int limit = 0;
    T next = null;

    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }

    public boolean hasNext() {
      // Made into a `while` loop to fix issue reported by @Nim in code review
      while (next == null && limit > 0) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }

    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }

    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }

  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}

请注意,这与O1 put和get非常接近。一个“分隔符”仅在第一次发出“”,然后从那时起就发出其参数。
编辑:添加了测试方法。
// ***** Following only needed for testing. *****
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");

private static synchronized void log(boolean toStdoutToo, String s) {
  if (Debug) {
    if (toStdoutToo) {
      System.out.println(s);
    }
    log(s);
  }
}

private static synchronized void log(String s) {
  if (Debug) {
    try {
      log.writeLn(logName, s);
    } catch (IOException ex) {
      ex.printStackTrace();
    }
  }
}
static volatile boolean testing = true;

// Tester object to exercise the container.
static class Tester<T> implements Runnable {
  // My name.

  T me;
  // The container I am testing.
  Container<T> c;

  public Tester(Container<T> container, T name) {
    c = container;
    me = name;
  }

  private void pause() {
    try {
      Thread.sleep(0);
    } catch (InterruptedException ex) {
      testing = false;
    }
  }

  public void run() {
    // Spin on add/remove until stopped.
    while (testing) {
      // Add it.
      Node<T> n = c.add(me);
      log("Added " + me + ": " + c.toString());
      pause();
      // Remove it.
      c.remove(n, me);
      log("Removed " + me + ": " + c.toString());
      pause();
    }
  }
}
static final String[] strings = {
  "One", "Two", "Three", "Four", "Five",
  "Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);

public static void main(String[] args) throws InterruptedException {
  Debug = true;
  log.delete(logName);
  Container<String> c = new Container<String>(10);

  // Simple add/remove
  log(true, "Simple test");
  Node<String> it = c.add(strings[0]);
  log("Added " + c.toString());
  c.remove(it, strings[0]);
  log("Removed " + c.toString());

  // Capacity test.
  log(true, "Capacity test");
  ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
  // Fill it.
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  // Add one more.
  try {
    c.add("Wafer thin mint!");
  } catch (IllegalStateException ise) {
    log("Full!");
  }
  c.clear();
  log("Empty: " + c.toString());

  // Iterate test.
  log(true, "Iterator test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
  }
  StringBuilder all = new StringBuilder ();
  Separator sep = new Separator(",");
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("All: "+all);
  for (int i = 0; i < strings.length; i++) {
    c.remove(nodes.get(i), strings[i]);
  }
  sep.reset();
  all.setLength(0);
  for (String s : c) {
    all.append(sep.sep()).append(s);
  }
  log("None: " + all.toString());

  // Multiple add/remove
  log(true, "Multi test");
  for (int i = 0; i < strings.length; i++) {
    nodes.add(i, c.add(strings[i]));
    log("Added " + strings[i] + " " + c.toString());
  }
  log("Filled " + c.toString());
  for (int i = 0; i < strings.length - 1; i++) {
    c.remove(nodes.get(i), strings[i]);
    log("Removed " + strings[i] + " " + c.toString());
  }
  c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
  log("Empty " + c.toString());

  // Multi-threaded add/remove
  log(true, "Threads test");
  c.clear();
  for (int i = 0; i < TEST_THREADS; i++) {
    Thread t = new Thread(new Tester<String>(c, strings[i]));
    t.setName("Tester " + strings[i]);
    log("Starting " + t.getName());
    t.start();
  }
  // Wait for 10 seconds.
  long stop = System.currentTimeMillis() + 10 * 1000;
  while (System.currentTimeMillis() < stop) {
    Thread.sleep(100);
  }
  // Stop the testers.
  testing = false;
  // Wait some more.
  Thread.sleep(1 * 100);
  // Get stats.
  double added = c.totalAdded.doubleValue();
  double skipped = c.totalSkipped.doubleValue();
  //double freed = c.freed.doubleValue();
  log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}

1
你对这个算法的正确性有任何正式的验证吗?无锁数据结构很难做到正确,除非你避免重用节点... - thkala
@thkala - 你需要多正式的翻译?主要算法在getFree方法中,它选择一个空闲节点并标记为已使用。这很简单,正确性应该是不言自明的。我添加了我的测试方法,也许它们会有所帮助。 - OldCurmudgeon
那种已发表并经过同行评审的算法所具有的“正式”特征。我曾广泛地使用无锁数据结构,它们可能非常难以正确实现。只是有太多边角情况... - thkala
抱歉,但我忍不住要说……在环形结构中不存在角落案例。笑……不过说真的,除了在这里,我还没有正式发布它,也没有进行同行评审。如果您想的话,我可以将其发布到CodeReview上并看看他们会说些什么。我已经在实际环境中使用它相当长时间了,似乎很可靠。我知道它收集的统计数据有些粗糙,但它们不是算法的一部分。 - OldCurmudgeon
插入和删除看起来很好,因为Node.free标志被CAS并且这也确保了Node.element的线程间可见性。然而,可能存在设计问题,这些问题会在迭代器中显示——由于freeelement是独立的字段,当读取时,UsedNodesIterator无法避免竞争问题。我相信大多数java.util.concurrent集合将“element”和“free”状态组合成单个字段,以允许无竞争地读取,并禁止空元素以协助此过程。 - Thomas W

3

也许您想看看Disruptor - 并发编程框架

  • 在这里,找到一篇介绍替代方案、设计以及与java.util.concurrent.ArrayBlockingQueue的性能比较的论文:pdf
  • 考虑阅读BlogsAndArticles中的前三篇文章

如果这个库太过复杂,请使用java.util.concurrent.ArrayBlockingQueue


2
我建议您查看ArrayDeque,或者查看更具并发性的实现,例如Disruptor库,它是Java中最复杂/复杂的环形缓冲区之一。
另一个选择是使用无界队列,这更具并发性,因为生产者永远不需要等待消费者。Java Chronicle 除非您的需求需要复杂度,否则ArrayDeque可能已经足够。

1
一个重要的问题是:ArrayDeque 没有大小限制。它使用循环数组,确实可以调整大小以容纳更多元素。但是,在一段时间之后,操作者必须手动 pop() 一个元素然后才能插入新元素,同时还必须显式地维护线程安全... - thkala
1
如果您需要限制大小,可以使用ArrayBlockingQueue。 - Peter Lawrey
2
ArrayBlockingQueue通过阻塞直到有元素被移除来限制其大小。据我所知,OP希望队列可以隐式地删除或覆盖最旧的元素,只保留最新的N个元素。 - thkala
3
这不是我会做的方式。我更愿意将所有内容存储在数据库中,并明确地清除旧记录。这将带来额外的好处,允许在未来可能需要的更复杂的查询。 - thkala
Disruptor库是Java中最复杂/复杂的环形缓冲区。我认为这是一种观点。那个人根本不需要交接,因此Disruptor不适合这种情况。即使使用交接,扁平组合也会优于生产者争用的Disruptor(在生产者之间可能不是严格的FIFO)。 - bestsss
显示剩余2条评论

1

1
Hazelcast的队列几乎提供了您所需的一切,但不支持循环。但根据您的描述,我不确定您是否真正需要它。

0

如果是我,我会像你所说的那样使用CircularFIFOBuffer,并在写入(添加)时对缓冲区进行同步。当监视应用程序想要读取缓冲区时,在缓冲区上同步,然后复制或克隆它以用于报告。

这个建议是基于假设延迟很小,可以将缓冲区复制/克隆到一个新对象中。如果元素数量很大,复制时间很慢,那么这不是一个好主意。

伪代码示例:

public void writeRequest(String requestID) {
    synchronized(buffer) {
       buffer.add(requestID);
    }
}

public Collection<String> getRequests() {
     synchronized(buffer) {
        return buffer.clone();
     }
}

0

鉴于您特别要求将编写者(即Web服务器)的优先级高于读者(即监控),我建议采用以下设计。

Web服务器将请求信息添加到并发队列中,由专用线程读取,该线程将请求添加到线程本地(因此非同步)队列中,覆盖最旧的元素,如EvictingQueueCircularFifoQueue。 同一线程在处理每个请求后检查一个标志,该标志指示是否已经请求了报告,并且如果为正,则从线程本地队列中的所有元素生成报告。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接