在不锁定整个HashMap的情况下,是否可能在线程间共享一个HashMap?

29

我希望在线程之间共享一个结构体。这个结构体有很多字段从未被修改,另外还有一个HashMap是可变的。我不想为了单个的更新/删除操作锁定整个HashMap,所以我的HashMap看起来像HashMap<u8,Mutex<u8>>。虽然这样可以工作,但这没有意义,因为线程最终会锁定整个映射。

这里是不涉及线程的可行版本,我认为这个例子并不需要线程。

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

fn main() {
    let s = Arc::new(Mutex::new(S::new()));
    let z = s.clone();
    let _ = z.lock().unwrap();
}

struct S {
    x: HashMap<u8, Mutex<u8>>, // other non-mutable fields
}

impl S {
    pub fn new() -> S {
        S {
            x: HashMap::default(),
        }
    }
}

Playground

有没有任何方式可以实现这个?文档里有没有我错过的明显东西?

我一直在尝试使其工作,但是我不太确定如何做。基本上,每个示例中都会有一个Mutex(或RwLock,或类似的东西)来保护内部值。


2
为了实现线程安全,没有绕过的方法。必须在整个类型周围设置保护,而不仅仅是它的基础字段。 - squiguy
@Shepmaster 谢谢!太糟糕了,可惜。向地图中插入/删除元素非常少见,大约只有99.9%的更新。这就是为什么我认为我只需锁定地图中的单个元素,并使用某种其他方法来确保在没有任何工作人员使用地图时进行插入/删除。回到思考吧。 - Andrew
@Shepmaster 如果我很蠢,请原谅,但是更新地图中的单个元素仍然需要写锁定吗?或者您的意思是,我应该在实际修改地图之前使用读取锁定,然后将其升级为写入锁定?考虑到实际工作的 ~95%都会以这种方式处于写入锁定状态,那么我是否应该直接使用Mutex来避免RwLock的开销? - Andrew
你的线程是否真的花费了大量时间来访问HashMap?它是以读为主吗? - David Schwartz
3个回答

28
我不认为您的请求是可能的,至少没有一些极度聪明的无锁数据结构;如果多个线程需要插入哈希到同一位置的新值,那么会发生什么?
在以前的工作中,我使用了一个RwLock>>。当将值插入哈希表时,您短暂地获得一个独占锁。其余时间,您可以拥有对HashMap和因此给定元素的多个读取者锁。如果它们需要改变数据,它们可以获得Mutex的排他访问。
以下是示例:
use std::{
    collections::HashMap,
    sync::{Arc, Mutex, RwLock},
    thread,
    time::Duration,
};

fn main() {
    let data = Arc::new(RwLock::new(HashMap::new()));

    let threads: Vec<_> = (0..10)
        .map(|i| {
            let data = Arc::clone(&data);
            thread::spawn(move || worker_thread(i, data))
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }

    println!("{:?}", data);
}

fn worker_thread(id: u8, data: Arc<RwLock<HashMap<u8, Mutex<i32>>>>) {
    loop {
        // Assume that the element already exists
        let map = data.read().expect("RwLock poisoned");

        if let Some(element) = map.get(&id) {
            let mut element = element.lock().expect("Mutex poisoned");

            // Perform our normal work updating a specific element.
            // The entire HashMap only has a read lock, which
            // means that other threads can access it.
            *element += 1;
            thread::sleep(Duration::from_secs(1));

            return;
        }

        // If we got this far, the element doesn't exist

        // Get rid of our read lock and switch to a write lock
        // You want to minimize the time we hold the writer lock
        drop(map);
        let mut map = data.write().expect("RwLock poisoned");

        // We use HashMap::entry to handle the case where another thread 
        // inserted the same key while where were unlocked.
        thread::sleep(Duration::from_millis(50));
        map.entry(id).or_insert_with(|| Mutex::new(0));
        // Let the loop start us over to try again
    }
}

这段代码在我的机器上运行大约需要2.7秒,即使它启动了10个线程,每个线程都持有元素数据的独占锁并等待1秒钟。
然而,这种解决方案并非没有问题。当对那个主锁存在巨大争用时,获取写锁可能需要一段时间,并完全破坏了并行性。
在这种情况下,您可以切换到RwLock<HashMap<K,Arc<Mutex<V>>>>。一旦获得读或写锁,您就可以克隆该值的Arc,返回它并解锁哈希映射表。
更高级的方法是使用像arc-swap这样的crate,它说:

然后会锁定,克隆 [RwLock<Arc<T>>] 并解锁。这会导致CPU级别的争用(在锁和Arc的引用计数上),从而使其相对较慢。根据实现方式,由于稳定的读取流入,更新可能会被阻塞任意长的时间。

可以改为使用ArcSwap,它解决了上述问题,在争用和非争用场景中都比RwLock具有更好的性能特征。

我经常主张执行某种更智能的算法。例如,您可以启动N个线程,每个线程都有自己的{{HashMap}}。然后,您将工作分配给它们。对于上面的简单示例,您可以使用{{id%N_THREADS}},例如。还有复杂的分片方案,这取决于您的数据。
正如Go所做的一样很好的传播:不要通过共享内存来通信;相反,通过通信来共享内存。

1
@Charlie木匠 克隆 Arc 只涉及复制指针并原子性地增加引用计数 - 这是一项非常快速的操作。新克隆的值可以独立使用,不再需要对 RwLock 进行读/写锁定。 - Shepmaster
@Shepmaster,我能问一下为什么在你的第二个例子中使用RwLock<HashMap<K, Arc<Mutex<V>>>>吗?如果读写数量相同,那么Mutex<HashMap<K, Arc<Mutex<V>>>>在内存方面会更轻一些,但效果相同。此外,在您的分片示例中,您可以使用Vec<Mutex<HashMap<K, V>>>,并像您所说的那样使用hash(key)%shards.len()来检索分片,而不是旋转线程。 - Ervadac
如果我们假设哈希表的键永远不会被更新,只有值会被更新,那么我们能否将 RWLock 从哈希表中移除,并仅在值上保留锁定?类似这样的做法是否可行? - jimouris
1
@jimouris 当然可以,如果你是指像这样的东西(https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=673df199dd4e1a4ff88968d1a4b836ce)。 - Shepmaster
如果我没记错的话,Rayon 是一个线程池,而 thread::scope 不是。此外,Rayon 还有很多酷炫的并行迭代器功能。 - Shepmaster
显示剩余5条评论

9
假设数据的键可以映射到一个。您可以拥有Arc>>>。
当您初始化数据结构时,请在将其放入Arc之前填充所有第一级映射(初始化后将不可变)。
当您需要从地图中获取值时,您需要执行双重获取,类似以下内容:
data.get(&map_to_u8(&key)).unwrap().lock().expect("poison").get(&key),其中解包是安全的,因为我们使用所有值初始化了第一个映射。
要在地图中写入类似以下内容:
data.get(&map_to_u8(id)).unwrap().lock().expect("poison").entry(id).or_insert_with(|| value);
很容易看出争用被减少了,因为现在我们有256个Mutex,多个线程请求相同的Mutex的概率较低。
@Shepmaster的示例在我的机器上使用100个线程需要约10秒钟,而以下示例需要略微超过1秒钟。
use std::{
    collections::HashMap,
    sync::{Arc, Mutex, RwLock},
    thread,
    time::Duration,
};

fn main() {
    let mut inner = HashMap::new( );
    for i in 0..=u8::max_value() {
        inner.insert(i, Mutex::new(HashMap::new()));
    }
    let data = Arc::new(inner);

    let threads: Vec<_> = (0..100)
        .map(|i| {
            let data = Arc::clone(&data);
            thread::spawn(move || worker_thread(i, data))
        })
        .collect();

    for t in threads {
        t.join().expect("Thread panicked");
    }

    println!("{:?}", data);
}

fn worker_thread(id: u8, data: Arc<HashMap<u8,Mutex<HashMap<u8,Mutex<i32>>>>> ) {
    loop {

        // first unwrap is safe to unwrap because we populated for every `u8`
        if let Some(element) = data.get(&id).unwrap().lock().expect("poison").get(&id) {
            let mut element = element.lock().expect("Mutex poisoned");

            // Perform our normal work updating a specific element.
            // The entire HashMap only has a read lock, which
            // means that other threads can access it.
            *element += 1;
            thread::sleep(Duration::from_secs(1));

            return;
        }

        // If we got this far, the element doesn't exist

        // Get rid of our read lock and switch to a write lock
        // You want to minimize the time we hold the writer lock

        // We use HashMap::entry to handle the case where another thread
        // inserted the same key while where were unlocked.
        thread::sleep(Duration::from_millis(50));
        data.get(&id).unwrap().lock().expect("poison").entry(id).or_insert_with(|| Mutex::new(0));
        // Let the loop start us over to try again
    }
}


在Java中也被称为ConcurrentHashMap - Agoston Horvath
两级HashMap互斥锁,感谢分享。可以将第一级Mutex更改为RW锁,这样读取就不会阻塞读取了吗?只有在第二级ID不存在时才使用写锁。 - Charlie 木匠
例如:lock_map: Arc<HashMap<u8,RwLock<HashMap<u8,Mutex<usize>>>>> - Charlie 木匠
1
是的,将Mutex的第一级改为读写锁看起来是一个不错的改进。 - Riccardo Casatta
这种技术有时被称为分片。如果你事先知道要分多少片,你可以把外部容器设为数组(如果不是的话就用向量)- 这样你的类型就变成了例如 Arc<[RwLock<HashMap<K, V>>; 8]>。然后你就可以节省一次哈希计算和一次查找的间接引用。请注意,为了使这些操作高效,你的 u8 键需要根据你的数据均匀分布,否则某个容器可能会比其他容器更满。如果有人正在寻找现成的解决方案,这就是 DashMap 的工作原理。 - undefined

5
也许你想考虑使用 evmap
这是一个无锁、最终一致的并发多值映射。但需要权衡的是:读者在作者进行“刷新”之前不会看到更改。刷新是原子性的,作者决定何时执行它并向读者公开新数据。

"evmap" 可以在每次写操作后刷新,以完全一致的模式下使用。 - Ibraheem Ahmed
写入和刷新操作并不是原子操作,因此我认为这并不符合“完全一致”的标准,因为读取请求仍然可能与刷新本身竞争,对吧? - jeremyong
@jeremyong 你为什么认为它不是原子性的?我现在找不到任何信息。据我理解,它通过保留两份地图副本来工作:一份用于读取,一份用于写入。写入操作不是原子性的,但交换这两个引用可能是原子性的。 - jonas
@jonas 在写入时,读取映射没有被锁定,因此在刷新可见之前,某人可能会发出读取请求。 - jeremyong
@jeremyong 是的,它是无锁的且始终保持一致性(因为写入者决定何时同步),但可能不是最新的。这就是所谓的“最终一致性”在此的含义:写入者和读取者之间最终一致,但如果写入者采取合理的措施,数据始终可以保持一致性。 - jonas
1
@jonas 我并不是在回应你的答案,而是在回应 Ibrahim 的评论,他认为 evmap 可以在“完全一致模式”下使用(我认为这是错误的)。 - jeremyong

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接