如何将Couchbase用作先进先出队列(FIFO队列)

3

使用Java客户端,我该如何使用couchbase实现FIFO队列并确保线程安全?队列中可能有多个线程在弹出和推入。队列中的每个对象都是一个字符串数组。

2个回答

4
Couchbase没有内置创建队列的功能,但您可以自己实现。
下面我将以简短的示例来说明如何实现。我们有一个名为“queue”的队列,它将具有名称为“item:<index>” 的项目。要实现队列,您需要使用类似以下格式的键存储您的值:<queue_name>:item:<index>,其中索引将是单独的键queue:index,在推入队列时需要递增,弹出时需要递减。
在Couchbase中,您可以使用递增和递减操作来实现队列,因为这些操作是原子和线程安全的。
因此,您的push和pop函数的代码将如下所示:
void push(string queue, string[] value){
  int index = couchbase.increment(queue + ':index');
  couchbase.set(queue + ':item:' + index, value);
}
string[] pop(string queue){
  int index = couchbase.get(queue + ':index');
  string[] result = couchbase.get(queue + ':item:' + index);
  couchbase.decrement(queue + ':index');
  return result;
}

抱歉关于代码的部分,我很久以前使用过Java和Couchbase Java客户端。如果现在Java客户端有回调函数,就像Node.js客户端一样,那么您可以重写该代码以使用回调函数。我认为这会更好。
此外,您可以在设置操作中添加额外的检查 - 使用“add”(在C#客户端中称为“StoreMode.Add”)操作,如果具有给定键的项已经存在,则会抛出异常。然后您可以捕获该异常并为相同的参数再次调用push函数。
更新:抱歉,那天早上太早了,所以我没想清楚。对于FIFO,正如@avsej所说,您需要两个计数器:queue:headqueue:tail。 所以对于FIFO:
void push(string queue, string[] value){
  int index = couchbase.increment(queue + ':tail');
  couchbase.set(queue + ':item:' + index, value);
}
string[] pop(string queue){
  int index = couchbase.increment(queue + ':head') - 1;
  string[] result = couchbase.get(queue + ':item:' + index);
  return result;
}

注意:代码的外观可能会因queue:tailqueue:head的起始值不同而略有不同(它将是零、一或其他)。此外,您可以为计数器设置一些max值,在达到该值后,queue:tailqueue:head将被重置为0(仅限于文档数量的限制)。如果您确实需要,还可以为每个文档设置expire值。

你的代码示例展示了栈,但是同样的思路也可以应用于队列,只不过需要两个索引,例如 queue + ':head'queue + ':tail',在 push 操作时需要增加 tail,在 pop 操作时需要减少 head。 - avsej
@m03geek 这会每次弹出最后插入的对象吗?[@]avsej 但要使用哪个头的值呢? - twb
@twb 是的,它会弹出最后一个对象。 - m03geek
我已经更新了答案。对我来说看起来是正确的。如果不是,请纠正我,@avsej,因为我不明白为什么弹出应该将头部减少... - m03geek
我意识到需要处理弹出操作比压入操作更快的情况。TTL 是任何将被使用的东西,因此也需要处理。 - twb

0
Couchbase已经拥有CouchbaseQueue数据结构。
示例用法:取自下面的SDK文档。
Queue<String> shoppingList = new CouchbaseQueue<String>("queueDocId", collection, String.class, QueueOptions.queueOptions());
shoppingList.add("loaf of bread");
shoppingList.add("container of milk");
shoppingList.add("stick of butter");

// What does the JSON document look like?
System.out.println(collection.get("queueDocId").contentAsArray());
//=> ["stick of butter","container of milk","loaf of bread"]

String item;
while ((item = shoppingList.poll()) != null) {
  System.out.println(item);
  // => loaf of bread
  // => container of milk
  // => stick of butter
}

// What does the JSON document look like after draining the queue?
System.out.println(collection.get("queueDocId").contentAsArray());
//=> []

Java SDK 3.1 CouchbaseQueue Doc

的意思是:Java SDK 3.1 CouchbaseQueue 文档。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接