使用LinkedBlockingQueue和flush到mysql

3
LinkedBlockingQueue是否适合以下需求:
1. insert strings (maximum 1024 bytes) into the queue at a very high rate
2. every x inserts or based on a timed interval, flush items into mysql

在清空队列时,我正在查看API:http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/LinkedBlockingQueue.html

我在想drainTo是否是一个好选择,因为在清空前,我需要进行聚合。

所以我将从队列中取出项目,然后迭代和聚合,最后写入mysql。

这对每秒多达10K个写入器是否适用?

我需要考虑任何锁定/同步问题吗,还是已经处理了?

我将把这个linkedblockingqueue作为concurrenthashmap中的值存储。

项目将永远不会从哈希映射中删除,仅在不存在时被插入,并且如果存在,则将附加到队列末端。


你为什么在那里使用 ConcurrentHashMap?这个 map 中的键是什么? - Gray
MySQL插入线程是1个还是多个?每个队列一个还是总共只有1个? - Gray
@Gray,键将是客户端ID,对于写入,我可以简化它,并只使用单个线程进行刷新。 - codecompleting
每个队列可能更容易拥有一个线程。请随意在下面的答案中添加评论。 - Gray
2个回答

3

这要看插入器是针对每个队列还是所有队列。如果我理解您的规格,我认为以下内容可能有效。

编写者将项目添加到地图中一个LinkedBlockingQueue集合。如果队列大小超过X(如果您希望它是每个队列),则会发出MySQL插入器线程信号。应该可以使用类似以下内容:

queue.add(newItem);
// race conditions here that may cause multiple signals but that's ok
if (queue.size() > 1000) {
    // this will work if there is 1 inserter per queue
    synchronized (queue) {
        queue.notify();
    }
}
...

然后插入器在队列上等待,并进入以下类似的循环:
List insertList = new ArrayList();
while (!done) {
    synchronized (queue) {
        // typically this would be while but if we are notified or timeout we insert
        if (queue.size() < 1000) {
            queue.wait(MILLIS_TIME_INTERVAL);
        }
    }
    queue.drainTo(insertList);
    // insert them into the db
    insertList.clear();
}

如果只有一个线程在所有队列中进行插入,情况会变得更加复杂。我猜问题是为什么你需要使用ConcurrentHashMap?如果你只有一个插入器,例如插入到多个表中,那么你将需要一种机制来通知插入器哪些队列需要被处理。它可以遍历地图中的所有队列,但这可能很昂贵。你需要在某个全局锁对象或者map对象上同步,而不是在队列上。
另外,正如@Peter Lawrey提到的,如果数据库比写入者慢,你很快就会耗尽内存,因此请确保设置了适当的队列容量,以限制写入者并降低工作内存的使用。
希望这能帮助你。

1

对于每个队列,您需要一个线程和一个连接,因此我不会创建太多队列。如果您的MySQL服务器可以处理,您可以每秒执行超过10K次写入操作(只有在测试时才知道)。LinkedBlockingQueue是线程安全的,只要在启动之前创建了所有队列,就不需要任何锁定/同步。

如果您正在以每秒10K个字符长的字符串进行插入,则很可能很快就会耗尽内存(每小时高达36 GB)。相反,我会让数据库仅插入新字符串。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接