Kotlin并发编程之ConcurrentHashMap

8

我正在尝试支持对一个定期清除的哈希表进行并发操作。我有一个缓存,存储一段时间内的数据。每隔5分钟,缓存中的数据会被发送到服务器。一旦我刷新了缓存,就想清空它。问题是,当我在执行这个操作时,可能会有数据被写入到这个映射表中,使用了已存在的键。如何使这个过程线程安全?

data class A(val a: AtomicLong, val b: AtomicLong) {
   fun changeA() {
      a.incrementAndGet()
   }
}

class Flusher {
   private val cache: Map<String, A> = ConcurrentHashMap()
   private val lock = Any()
   fun retrieveA(key: String){
       synchronized(lock) {
          return cache.getOrPut(key) { A(key, 1) }
       }
   }
 
   fun flush() {
      synchronized(lock) {
           // send data to network request
           cache.clear()
      }
   }
}

// Existence of multiple classes like CacheChanger
class CacheChanger{
  fun incrementData(){
      flusher.retrieveA("x").changeA()
  }
}

我担心上述缓存没有得到正确同步。有更好/正确的方式来锁定这个缓存,以便我不会丢失数据吗?我应该创建一个缓存的深拷贝并清空它吗?

由于上述数据可能会被另一个修改者更改,这难道不会导致问题吗?


除了retrieve和flush之外,还有哪些函数会修改地图?这两个函数都在同一个锁上同步,那么你害怕的问题是什么? - ciamej
问题在于类A的值是可以被改变的。如果类A的值被改变了,而我又将其清除,那该怎么办呢?我会更新这个例子。 - LateNightDev
@michalik,不能安全地摆脱锁定,因为flush需要是原子性的 - 需要读取整个映射然后清除,而且在此过程中不能交错任何写入。 - ciamej
我已经更新了这个类,并加入了示例。 - LateNightDev
你可能需要将 cache 的类型设置为 ConcurrentMap 才能获得适当的并发行为。 - Louis Wasserman
显示剩余2条评论
2个回答

3

你可以解除锁定。

在 flush 方法中,不要通过迭代器读取整个 map 然后清空它,而是逐个删除每个元素。

我不确定是否可以使用迭代器的 remove 方法(我会立即检查),但你可以获取 keyset 并迭代它,对于每个键调用 cache.remove() - 这将给你存储的值并原子性地从缓存中删除它。

关键部分是如何确保 A 类对象在发送到网络之前不会被修改...您可以按以下方式操作:

当你通过 retrieveA 获取一些 x 并修改对象时,你需要确保它仍然在缓存中。简单地再次调用 retrieve。如果你得到完全相同的对象,那就没问题了。如果不同,则意味着该对象已被移除并发送到网络,但你不知道修改是否也已被发送,或者修改之前的对象状态是否已发送。不过,我认为在你的情况下,你可以简单地重复整个过程(应用更改并检查对象是否相同)。但这取决于你的应用程序的具体情况。

如果你不想增加两次,那么在将数据发送到网络时,你将需要读取计数器 a 的内容,将其存储在某个局部变量中,并将 a 减少该量(通常它将变为零)。然后在 CacheChanger 中,当你从第二个 retrieve 获取不同的对象时,你可以检查该值是否为零(你的修改已被考虑),或者非零表示你的修改稍微晚了一点,你需要重复该过程。

你也可以用 compareAndSwap 替换 incrementAndGet,但这可能会导致略微较差的性能。在这种方法中,你尝试交换一个大于当前值的值。并且在发送到网络之前,你尝试将该值交换为-1以表示该值无效。如果第二次交换失败,则表示有人同时更改了该值,你需要再次检查以发送最新的值,并在循环中重复该过程(仅在交换到 -1 成功后才中断循环)。在增量为一的交换情况下,如果交换失败,你也需要在循环中重复该过程,直到交换成功。如果失败,则表示其他人已经交换到某个更大的值,或者刷新器已经交换到 -1。在后一种情况下,你知道你需要再调用 retrieveA 一次以获取新对象。


为什么需要遍历所有条目并模仿ConcurrentHashMap :: clear方法的行为,当它已经提供并保证与ConcurrentHashMap中的其他方法一样线程安全?你基本上正在放弃ConcurrentHashMap的好处,而选择外部同步。 - Michał Krzywański
@michalik 因为 clear 不会返回 map 的内容。你想要原子地获取内容并同时删除它们。 - ciamej
我错过了有关通过网络发送数据的评论。在这种情况下,您的解决方案将起作用,但OP的解决方案(使用锁)也将起作用(只要需要保留这两个操作的原子性),但在修改值时他还需要持有锁(例如通过在“Flusher”中公开一个用于修改的方法)。此外,您描述的解决方案(似乎是一种CAS算法类型)可能需要“CacheChanger”在数据更改的情况下进行CAS检查。但总体而言,它确实取决于应用程序的具体情况。 - Michał Krzywański
1
@michalk 是的,对于原问题,另一种替代方案是保留锁定状态,将Map更改为普通Map(不需要ConcurrentHashMap),但在执行方法检索时应用所有修改,同时仍保持锁定状态。 - ciamej
@michalk 好的,不用理会我上次的评论了,我已经自己完成了。 - ciamej
显示剩余2条评论

0

最简单的解决方案(但性能较差)是完全依赖锁。

您可以将ConcurrentHashMap更改为常规 HashMap

然后,您必须直接在函数retrieve中应用所有更改:

fun retrieveA(key: String, mod: (A) -> Unit): A {
    synchronized(lock) {
        val obj: A = cache.getOrPut(key) { A(key, 1) }
        mod(obj)
        cache.put(obj)
        return obj
    }
}

我希望它能编译通过(我不是 Kotlin 专家)。

然后你可以像这样使用它:

class CacheChanger {
    fun incrementData() {
        flusher.retrieveA("x") { it.changeA() }
    }
}

好吧,我承认这段代码并不是真正的 Kotlin ;) 你应该使用 Kotlin lambda 而不是 Consumer 接口。我已经有一段时间没有玩过 Kotlin 了。如果有人能修复它,我将非常感激。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接