这是一个无锁环形缓冲区的实现。它实现了一个固定大小的缓冲区 - 没有FIFO功能。我建议您为每个服务器存储一个请求的Collection
。这样你的报告可以进行过滤,而不是让你的数据结构进行过滤。
public class Container<T> implements Iterable<T> {
final int capacity;
AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
AtomicLong totalAdded = new AtomicLong(0);
AtomicLong totalFreed = new AtomicLong(0);
AtomicLong totalSkipped = new AtomicLong(0);
private void resetStats() {
totalAdded.set(0);
totalFreed.set(0);
totalSkipped.set(0);
}
public Container(int capacity) {
this.capacity = capacity;
Node<T> h = new Node<T>();
Node<T> it = h;
for (int i = 0; i < capacity - 1; i++) {
it.next = new Node<T>();
it = it.next;
}
it.next = h;
head.set(h);
}
public void clear() {
Node<T> it = head.get();
for (int i = 0; i < capacity; i++) {
it.element = null;
it.free.set(true);
it = it.next;
}
resetStats();
}
public Node<T> add(T element) {
totalAdded.incrementAndGet();
return getFree().attach(element);
}
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
totalSkipped.addAndGet(skipped);
if (skipped < capacity) {
head.set(freeNode.next);
} else {
throw new IllegalStateException("Capacity exhausted.");
}
return freeNode;
}
public void remove(Node<T> it, T element) {
totalFreed.incrementAndGet();
it.detach(element);
if (!it.free.compareAndSet(false, true)) {
throw new IllegalStateException("Freeing a freed node.");
}
}
public static class Node<T> {
private T element;
private AtomicBoolean free = new AtomicBoolean(true);
private Node<T> next;
private Node() {
element = null;
}
public Node<T> attach(T element) {
if (this.element == null) {
this.element = element;
} else {
throw new IllegalArgumentException("There is already an element attached.");
}
return this;
}
public Node<T> detach(T element) {
if (this.element == element) {
this.element = null;
} else {
throw new IllegalArgumentException("Removal of wrong element.");
}
return this;
}
public T get () {
return element;
}
@Override
public String toString() {
return element != null ? element.toString() : "null";
}
}
public Iterator<T> iterator() {
return new UsedNodesIterator<T>(this);
}
private static class UsedNodesIterator<T> implements Iterator<T> {
Node<T> it;
int limit = 0;
T next = null;
public UsedNodesIterator(Container<T> c) {
it = c.head.get();
limit = c.capacity;
}
public boolean hasNext() {
while (next == null && limit > 0) {
while (limit > 0 && it.free.get() == true) {
it = it.next;
limit -= 1;
}
if (limit != 0) {
next = it.element;
}
}
return next != null;
}
public T next() {
T n = null;
if ( hasNext () ) {
n = next;
next = null;
it = it.next;
limit -= 1;
} else {
throw new NoSuchElementException ();
}
return n;
}
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
Separator comma = new Separator(",");
int usedCount = 0;
int freeCount = 0;
Node<T> it = head.get();
int count = 0;
s.append("[");
while (count < capacity) {
if (it.free.get() == false) {
T e = it.element;
if (e != null) {
s.append(comma.sep()).append(e.toString());
usedCount += 1;
} else {
freeCount += 1;
}
} else {
freeCount += 1;
}
it = it.next;
count += 1;
}
s.append("]").append(usedCount).append("+").append(freeCount);
if (usedCount + freeCount != capacity) {
s.append("?");
}
return s.toString();
}
}
请注意,这与O1 put和get非常接近。一个“分隔符”仅在第一次发出“”,然后从那时起就发出其参数。
编辑:添加了测试方法。
private static boolean Debug = false;
private final static String logName = "Container.log";
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\");
private static synchronized void log(boolean toStdoutToo, String s) {
if (Debug) {
if (toStdoutToo) {
System.out.println(s);
}
log(s);
}
}
private static synchronized void log(String s) {
if (Debug) {
try {
log.writeLn(logName, s);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
static volatile boolean testing = true;
static class Tester<T> implements Runnable {
T me;
Container<T> c;
public Tester(Container<T> container, T name) {
c = container;
me = name;
}
private void pause() {
try {
Thread.sleep(0);
} catch (InterruptedException ex) {
testing = false;
}
}
public void run() {
while (testing) {
Node<T> n = c.add(me);
log("Added " + me + ": " + c.toString());
pause();
c.remove(n, me);
log("Removed " + me + ": " + c.toString());
pause();
}
}
}
static final String[] strings = {
"One", "Two", "Three", "Four", "Five",
"Six", "Seven", "Eight", "Nine", "Ten"
};
static final int TEST_THREADS = Math.min(10, strings.length);
public static void main(String[] args) throws InterruptedException {
Debug = true;
log.delete(logName);
Container<String> c = new Container<String>(10);
log(true, "Simple test");
Node<String> it = c.add(strings[0]);
log("Added " + c.toString());
c.remove(it, strings[0]);
log("Removed " + c.toString());
log(true, "Capacity test");
ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length);
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
try {
c.add("Wafer thin mint!");
} catch (IllegalStateException ise) {
log("Full!");
}
c.clear();
log("Empty: " + c.toString());
log(true, "Iterator test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
}
StringBuilder all = new StringBuilder ();
Separator sep = new Separator(",");
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("All: "+all);
for (int i = 0; i < strings.length; i++) {
c.remove(nodes.get(i), strings[i]);
}
sep.reset();
all.setLength(0);
for (String s : c) {
all.append(sep.sep()).append(s);
}
log("None: " + all.toString());
log(true, "Multi test");
for (int i = 0; i < strings.length; i++) {
nodes.add(i, c.add(strings[i]));
log("Added " + strings[i] + " " + c.toString());
}
log("Filled " + c.toString());
for (int i = 0; i < strings.length - 1; i++) {
c.remove(nodes.get(i), strings[i]);
log("Removed " + strings[i] + " " + c.toString());
}
c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]);
log("Empty " + c.toString());
log(true, "Threads test");
c.clear();
for (int i = 0; i < TEST_THREADS; i++) {
Thread t = new Thread(new Tester<String>(c, strings[i]));
t.setName("Tester " + strings[i]);
log("Starting " + t.getName());
t.start();
}
long stop = System.currentTimeMillis() + 10 * 1000;
while (System.currentTimeMillis() < stop) {
Thread.sleep(100);
}
testing = false;
Thread.sleep(1 * 100);
double added = c.totalAdded.doubleValue();
double skipped = c.totalSkipped.doubleValue();
log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped) / added) + ")");
}