Java并发编程: 多个写者,一个读者

17

我需要在我的软件中收集一些统计数据,并尝试使其快速和正确,这对我来说并不容易!

首先是我的代码,目前有两个类,一个是StatsService,另一个是StatsHarvester。

public class StatsService
{
private Map<String, Long>   stats   = new HashMap<String, Long>(1000);

public void notify ( String key )
{
    Long value = 1l;
    synchronized (stats)
    {
        if (stats.containsKey(key))
        {
            value = stats.get(key) + 1;
        }
        stats.put(key, value);
    }
}

public Map<String, Long> getStats ( )
{
    Map<String, Long> copy;
    synchronized (stats)
    {
        copy = new HashMap<String, Long>(stats);
        stats.clear();
    }
    return copy;
}
}

这是我的第二个类,一个收集统计数据并定时将其写入数据库的收割机。

public class StatsHarvester implements Runnable
{
private StatsService    statsService;
private Thread          t;

public void init ( )
{
    t = new Thread(this);
    t.start();
}

public synchronized void run ( )
{
    while (true)
    {
        try
        {
            wait(5 * 60 * 1000); // 5 minutes
            collectAndSave();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private void collectAndSave ( )
{
    Map<String, Long> stats = statsService.getStats();
    // do something like:
    // saveRecords(stats);
}
}

在运行时,它将有大约30个并发运行的线程,每个线程调用notify(key)约100次。只有一个StatsHarvester正在调用statsService.getStats()

所以我有很多写入者和只有一个读取者。准确的统计数据是不错的,但如果在高并发情况下丢失一些记录,我并不在意。

阅读器应该每5分钟运行一次或者任何合理的时间。

写入应尽可能快。阅读应该快速,但如果每5分钟锁定约300ms,那也没关系。

我已经阅读了许多文档(Java并发实践,effective java等),但我强烈感觉需要您的建议来正确处理它。

我希望我表述的问题足够清晰简洁,以获得有价值的帮助。


编辑

感谢大家提供详细和有帮助的答案。正如我所预期的那样,有多种方法可以做到这一点。

我测试了大多数提议(我理解的那些)并上传了一个测试项目到Google Code以供进一步参考(Maven项目)

http://code.google.com/p/javastats/

我已经测试了不同的StatsService实现

  • HashMapStatsService (HMSS)
  • ConcurrentHashMapStatsService (CHMSS)
  • LinkedQueueStatsService (LQSS)
  • GoogleStatsService (GSS)
  • ExecutorConcurrentHashMapStatsService (ECHMSS)
  • ExecutorHashMapStatsService (EHMSS)

我使用x个线程进行测试,每个线程调用notify方法y次,结果以毫秒为单位。

         10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
GSS       1        5        17       7        21       117      7        37       254       Summe: 466
ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
LQSS      0        3        11       3        16       56       6        27       144       Summe: 266

此时我认为我会使用ConcurrentHashMap,因为它提供了良好的性能,同时也很容易理解。

感谢您的所有建议! Janning


1
我认为在持有对象锁时更改对象状态不是一个好主意。此外,在StatsHarvester中,没有必要同步run()方法。 - matt b
1
下面代码中的synchronized关键字(在CHMSS中)似乎是不必要的。除非您正在使用多个线程同时收集统计信息(显然,如果是这样,那么似乎很奇怪)。即使您在那里放置synchronized,它也无法阻止线程调用notify()。如果是我,我就不会费心,因为统计数据不必“100%”准确。如果您使用CHM,则无法锁定整个映射。synchronized (stats) { copy = new HashMap(stats); stats.clear(); } return copy; - Enno Shioji
9个回答

16

正如Jack所说,您可以使用java.util.concurrent库,其中包括ConcurrentHashMap和AtomicLong。您可以将AtomicLong放入"如果不存在则添加"中,否则,您可以增加其值。由于AtomicLong是线程安全的,因此您可以安全地增加变量而不必担心并发问题。

public void notify(String key) {
    AtomicLong value = stats.get(key);
    if (value == null) {
        value = stats.putIfAbsent(key, new AtomicLong(1));
    }
    if (value != null) {
        value.incrementAndGet();
    }
}

这应该同时快速又线程安全

编辑:稍微重构了一下,所以最多只有两个查找。


1
@BalusC:文档中说它返回“与指定键关联的先前值;如果没有为该键建立映射,则返回null”。您的一行代码会在第一次通知特定键时抛出NullPointerException异常。 - Ben Lings
@BalausC 那不是原子操作。如果 stats.put 在你的 if 和 stats.put 之间被另一个线程替换了,会怎么样? - John Vint
3
根据 javadoc,您不需要第一次调用 stats.get 或第一次空检查。 putIfAbsent 方法将返回先前映射的值(在这种情况下应增加),或者如果没有先前的值,则返回 null,并插入提供的 AtomicLong(1)。 - Sean Reilly
1
是的,然而,持续调用putIfAbsent并不是使用ConcurrentHashMap的正确方式。对于每个单独的putIfAbsent,您将锁定与hashCode相关联的单个条目。您正在进行不必要的锁定,而如果您成功地进行初始获取,则永远不会阻塞。 - John Vint
1
@johnvint,您提供的代码存在问题 - 如果valuenull,则putIfAbsent也将返回null,因此value将保持为null。因此,现在编写的代码是正确的,但具有误导性。您可以选择不将putIfAbsent的返回值分配给value,或者将value初始化为某个真实值。 - SatA
显示剩余10条评论

8
为什么不使用java.util.concurrent.ConcurrentHashMap<K, V>?它在内部处理一切,避免了对映射的无用锁定,节省了大量工作:您不必关心get和put上的同步。
来自文档的内容:
支持检索的完全并发和可调整预期并发更新的哈希表。这个类遵守Hashtable相同的功能规范,并包括与Hashtable每个方法对应的版本的方法。然而,即使所有操作都是线程安全的,检索操作也不涉及锁定,并且没有任何支持以方式锁定整个表,以防止所有访问。
您可以指定其并发级别:
允许更新操作并发的数量由可选的concurrencyLevel构造函数参数(默认为16)指导,该参数用作内部大小的提示。表被内部分区以尝试允许指定数量的并发更新而不产生竞争。由于在哈希表中的放置本质上是随机的,实际并发性将有所变化。理想情况下,您应该选择一个值来容纳尽可能多的线程并发修改表格。使用比您需要的明显高的值可能会浪费空间和时间,而使用比您需要的明显低的值可能会导致线程争用。但是,数量级内的过高估计和低估计通常没有太大的影响。当已知仅有一个线程将修改且所有其他线程将只读取时,值为1是适当的。此外,调整此类哈希表或任何其他类型的哈希表的大小是一个相对较慢的操作,因此,在可能的情况下,在构造函数中提供预期表格大小的估计是一个好主意。

建议仔细阅读ConcurrentHashMap文档,特别是关于原子或非原子操作的说明。

为了确保原子性,您应考虑哪些操作是原子的,从ConcurrentMap接口中,您将会知道:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)

可以安全使用。


值得注意的是,这种方法很容易导致丢失更新。 - Ben Lings
1
根据 Map 合约,它不应该允许丢失更新,同时假设 putIfAbsent 是原子执行的。 - Jack
2
为了确保这一点,您需要循环直到replace(key, currentValue, currentValue+1)返回true。 - Ben Lings
你可能想在主帖中提到putIfAbsent。虽然ConcurrentHashMap的单个操作是线程安全的,但复合操作自然不是。 - matt b
4
除了考虑使用AtomicLong/AtomicInteger作为地图的值之外,这还应该被考虑,这将消除丢失增量的问题(除非在初始填充时存在潜在问题,除非所有键都是预填充的或者像马特·B所指出的那样包含对putIfAbsent的调用)。 - M. Jessup

6

我建议看一下Java的util.concurrent库。我认为您可以更加简洁地实现这个解决方案。我觉得这里根本不需要使用map。我建议使用ConcurrentLinkedQueue来实现它。每个“生产者”可以自由地向此队列写入,而无需担心其他人。它可以将带有其统计数据的对象放在队列中。

收割机可以连续消耗队列,拉取数据并处理它。然后,它可以以任何需要的方式存储数据。


4

Chris Dail的回答看起来是一个不错的方法。

另一种选择是使用并发的Multiset。在Google Collections库中有一个。您可以按照以下方式使用它:

private Multiset<String> stats = ConcurrentHashMultiset.create();

public void notify ( String key )
{
    stats.add(key, 1);
}

查看源代码,这是使用ConcurrentHashMap实现的,并使用putIfAbsent和三个参数的replace来检测并发修改并重试。


3
采用不同的方法来解决问题,利用线程封闭的(微不足道的)线程安全性。基本上创建一个单一的后台线程来负责读写。在可扩展性和简单性方面具有相当好的特点。
这个想法是,而不是所有的线程都试图直接更新数据,它们会为后台线程生成一个“更新”任务进行处理。同样的线程也可以做读取任务,只要在处理更新时有些滞后是可以容忍的。
这种设计非常好,因为线程将不再竞争锁以更新数据,并且由于Map被限制在单个线程中,您可以简单地使用一个普通的HashMap来执行get/put等操作。在实现方面,这意味着创建一个单线程执行器,并提交可能还执行可选的“收集并保存”操作的写入任务。
代码草图可能如下所示:
public class StatsService {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Map<String,Long> stats = new HashMap<String,Long>();

    public void notify(final String key) {
        Runnable r = new Runnable() {
            public void run() {
                Long value = stats.get(key);
                if (value == null) {
                    value = 1L;
                } else {
                    value++;
                }
                stats.put(key, value);
                // do the optional collectAndSave periodically
                if (timeToDoCollectAndSave()) {
                    collectAndSave();
                }
            }
        };
        executor.execute(r);
    }
}

与执行器相关联的BlockingQueue,每个为StatsService生成任务的线程都使用该BlockingQueue。关键点在于:此操作的锁定持续时间应比原始代码中的锁定持续时间短得多,因此争用应该少得多。总体而言,这应该会导致更好的吞吐量和延迟。

另一个好处是,由于只有一个线程读取和写入映射,因此可以使用普通的HashMap和原始的long类型(不涉及ConcurrentHashMap或原子类型)。这也大大简化了实际处理它的代码。

希望能对你有所帮助。


对不起,我不理解你的帖子。但是,你不是在把问题转移到另一个地方吗?当你说“生成更新任务”时,你必须将其放入地图或队列中,无论你喜欢哪种数据结构。而且你必须同步这个过程。 - Janning Vygen
是的,清晰多了。非常感谢。我已经为你的答案点赞了。我将测试你的方法,但我认为BlockingQueue必须以某种方式进行同步。因此,你将同步从StatsService移动到ExecutorService中。但我会检查它并编辑我的帖子以使我的结果可用。 - Janning Vygen
这是一个非常低效的解决方案。此外,在增量之后您还缺少了一个 put - finnw
@finnw 感谢您指出代码中的错误。我已经纠正了它。至于解决方案的有效性,(和生活中的大多数事情一样)这取决于情况。与使用ConcurrentHashMap(因此不进行同步)相比,它并不表现更好。但是,如果原始解决方案涉及同步,则这是一个非常有用的替代方案。基本上,它是问题的异步解决方案。显然,它最小化了生成这些任务的线程的延迟。其次,它消除了与锁定相关的锁争用。 - sjlee

1
你看过ScheduledThreadPoolExecutor吗?你可以使用它来安排你的写入器,这些写入器都可以写入并发集合,例如@Chris Dail提到的ConcurrentLinkedQueue。你可以独立地安排一个作业来在需要时从队列中读取,Java SDK应该处理几乎所有的并发问题,无需手动锁定。

0

另一种实现两种方法的替代方案是使用ReentranReadWriteLock。如果您需要清除计数器,则此实现可保护getStats方法免受竞态条件的影响。此外,它还将可变的AtomicLong从getStats中移除,并使用不可变的Long。

public class StatsService {

    private final Map<String, AtomicLong> stats = new HashMap<String, AtomicLong>(1000);
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public void  notify(final String key) {
        r.lock();
        AtomicLong count = stats.get(key);
        if (count == null) {
            r.unlock();
            w.lock();
            count = stats.get(key);
            if(count == null) { 
                count = new AtomicLong();
                stats.put(key, count);
            }
            r.lock();
            w.unlock();
        }
        count.incrementAndGet();
        r.unlock();
    }

    public Map<String, Long> getStats() {
        w.lock();

        Map<String, Long> copy = new HashMap<String, Long>();
        for(Entry<String,AtomicLong> entry : stats.entrySet() ){
                copy.put(entry.getKey(), entry.getValue().longValue());
        }
        stats.clear();
        w.unlock();

        return copy;
    }
}

希望这有所帮助,欢迎任何评论!


在执行w.unlock()之前调用r.lock()会导致死锁。 - Victor Khovanskiy

0

以下是如何在最小化对被测量的线程性能影响的情况下完成它。这是Java中可能的最快解决方案,而无需使用特殊硬件寄存器进行性能计数。

每个线程都独立地输出其统计信息,即不进行同步,输出到某个统计对象中。将包含计数的字段设为volatile,以便进行内存栅栏:

class Stats
{
   public volatile long count;
}

class SomeRunnable implements Runnable
{
   public void run()
   {
     doStuff();
     stats.count++;
   }
}

有另一个线程,它持有对所有Stats对象的引用,定期遍历它们并将所有线程中的计数相加:
public long accumulateStats()
{
   long count = previousCount;

   for (Stats stat : allStats)
   {
       count += stat.count;
   }

   long resultDelta = count - previousCount;
   previousCount = count;

   return resultDelta;
}

这个收集器线程也需要添加一个sleep()(或其他节流)函数。例如,它可以定期向控制台输出每秒计数,以便为您提供应用程序性能的“实时”视图。

这样可以尽量避免同步开销。

另一个要考虑的技巧是将Stats对象填充到128(或256字节在SandyBridge或更高版本上),以便将不同线程的计数保留在不同的缓存行上,否则CPU上会有缓存争用。

当只有一个线程读取和写入时,您不需要锁或原子操作,volatile就足够了。当统计信息读取线程与被测量线程的CPU缓存行交互时,仍然会存在一些线程争用。这是无法避免的,但这是以对运行线程影响最小的方式来完成的;每秒或更少地读取统计信息。


你可能会喜欢这个 https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/LongAdder.html - John Vint
谢谢,我还没有真正了解Java 8。与我的解决方案进行比较将是很有趣的,因为我的解决方案非常简单,可以检查是否存在任何额外的开销。一个可能不同的事情是线程计数,它们是否保存在ThreadLocal中?我通过让每个线程显式创建自己的Stats对象来避免这种情况。使用起来不太透明/简单,但避免了使用ThreadLocal(不是说ThreadLocal现在如此缓慢)。 - user2800708
这个答案应该得到一些赞,因为它比其他答案快得多。我想在许多情况下,极轻量级的统计信息收集可能并不那么重要,因为被测量的任务可能非常耗时和重量级。但是,如果您真的需要尽可能少地减慢代码的统计信息收集速度,这就是做法。 - user2800708
你在吞吐量上获得的收益会损失在一致性上。这是一个经典的权衡。 如果你需要的是大致值而不是恰好在某个时间点的确切值,那么这种解决方案最好。StampedLock文档甚至也有类似的说法。 - John Vint
没关系。你可以用锁来停止世界以获得“精确”的值,但是锁的获取方式无论如何都是非确定性的,因此结果不比我的更精确。重要的是计数不会被计算两次或遗漏,并且volatile上的内存栅栏确保对其的读写是一致的。基本上,如果你有一个线程在读取和一个线程在写入,你不需要锁或原子操作,volatile就足够了。 - user2800708

0
如果我们忽略收割部分,只关注写入,那么程序的主要瓶颈在于统计数据被锁定在非常粗糙的粒度水平上。如果两个线程想要更新不同的键,它们必须等待。
如果您已经预先知道键集,并且可以初始化映射,以便在更新线程到达时保证键存在,那么您将能够对累加器变量进行锁定,而不是整个映射,或使用线程安全的累加器对象。
而不是自己实现这一点,有些映射实现是专门为并发设计的,并为您执行更细粒度的锁定。
一个注意事项是数据统计,因为您需要同时获取所有累加器的锁定。如果您使用现有的并发友好映射,可能会有一个获取快照的构造。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接