如何在Swift中实现线程安全的散列表(电话簿)数据结构?

11

我正在尝试实现一个线程安全的电话簿对象。该电话簿应能够添加人员,并根据其姓名和电话号码查找人员。从实现的角度来看,这仅涉及两个哈希表,一个将名称->Person相关联,另一个将电话号码->Person相关联。

但有一个注意事项,我希望此对象是线程安全的。这意味着我希望能够支持PhoneBook中的并发查找,同时确保只有一个线程可以一次向PhoneBook添加Person。这是基本的读者-写者问题,我正在尝试使用GrandCentralDispatch和dispatch barriers来解决这个问题。但是我遇到了问题,以下是我的Swift playground代码:

//: Playground - noun: a place where people can play

import UIKit
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

public class Person: CustomStringConvertible {
    public var description: String {
        get {
            return "Person: \(name), \(phoneNumber)"
        }
    }

    public var name: String
    public var phoneNumber: String
    private var readLock = ReaderWriterLock()

    public init(name: String, phoneNumber: String) {
        self.name = name
        self.phoneNumber = phoneNumber
    }


    public func uniquePerson() -> Person {
        let randomID = UUID().uuidString
        return Person(name: randomID, phoneNumber: randomID)
    }
}

public enum Qos {
    case threadSafe, none
}

public class PhoneBook {

    private var qualityOfService: Qos = .none
    public var nameToPersonMap = [String: Person]()
    public var phoneNumberToPersonMap = [String: Person]()
    private var readWriteLock = ReaderWriterLock()


    public init(_ qos: Qos) {
        self.qualityOfService = qos
    }

    public func personByName(_ name: String) -> Person? {
        var person: Person? = nil
        if qualityOfService == .threadSafe {
            readWriteLock.concurrentlyRead { [weak self] in
                guard let strongSelf = self else { return }
                person = strongSelf.nameToPersonMap[name]
            }
        } else {
            person = nameToPersonMap[name]
        }

        return person
    }

    public func personByPhoneNumber( _ phoneNumber: String) -> Person? {
        var person: Person? = nil
        if qualityOfService == .threadSafe {
            readWriteLock.concurrentlyRead { [weak self] in
                guard let strongSelf = self else { return }
                person = strongSelf.phoneNumberToPersonMap[phoneNumber]
            }
        } else {
            person = phoneNumberToPersonMap[phoneNumber]
        }

        return person
    }

    public func addPerson(_ person: Person) {
        if qualityOfService == .threadSafe {
            readWriteLock.exclusivelyWrite { [weak self] in
                guard let strongSelf = self else { return }
                strongSelf.nameToPersonMap[person.name] = person
                strongSelf.phoneNumberToPersonMap[person.phoneNumber] = person
            }
        } else {
            nameToPersonMap[person.name] = person
            phoneNumberToPersonMap[person.phoneNumber] = person
        }
    }

}


// A ReaderWriterLock implemented using GCD and OS Barriers.
public class ReaderWriterLock {

    private let concurrentQueue = DispatchQueue(label: "com.ReaderWriterLock.Queue", attributes: DispatchQueue.Attributes.concurrent)
    private var writeClosure: (() -> Void)!

    public func concurrentlyRead(_ readClosure: (() -> Void)) {
        concurrentQueue.sync {
            readClosure()
        }
    }

    public func exclusivelyWrite(_ writeClosure: @escaping (() -> Void)) {
        self.writeClosure = writeClosure
        concurrentQueue.async(flags: .barrier) { [weak self] in
            guard let strongSelf = self else { return }
            strongSelf.writeClosure()
        }
    }

}

// MARK: Testing the synchronization and thread-safety

for _ in 0..<5 {
    let iterations = 1000
    let phoneBook = PhoneBook(.none)

    let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: DispatchQueue.Attributes.concurrent)
    for _ in 0..<iterations {
        let person = Person(name: "", phoneNumber: "").uniquePerson()
        concurrentTestQueue.async {
            phoneBook.addPerson(person)
        }
    }

    sleep(10)
    print(phoneBook.nameToPersonMap.count)
}

为了测试我的代码,我运行了1000个并发线程,每个线程简单地向电话簿中添加一个新的联系人。每个联系人都是唯一的,因此在这1000个线程完成后,我期望电话簿中包含1000个联系人。每次执行写操作时,我会执行dispatch_barrier调用、更新哈希表并返回。据我所知,这就是我们需要做的全部内容;然而,经过多次运行1000个线程,我发现电话簿中的条目数量不一致,且随处可见:

Phone Book Entries: 856
Phone Book Entries: 901
Phone Book Entries: 876
Phone Book Entries: 902
Phone Book Entries: 912

有人能帮我弄清楚发生了什么吗?我的锁定代码有问题还是更糟糕的是我的测试构造有问题?我对这个多线程问题空间非常陌生,谢谢!


我认为你的问题在于 sleep(10) 存在竞争条件。在需要等待所有工作完成的实际应用中,你不会使用它。尝试将其改为 sleep(30) 作为实验,看看结果是否会有所改善。 - Phillip Mills
在现实场景中,我该如何表达“等待所有工作完成”?我也对 sleep 函数感到不放心。使用 sleep(30) 也没用。 - AyBayBay
顺便说一下,我假设这个模型只是为了探索线程安全性,但我可能会建议稍微不同的结构,即一个 [Person] 的数组,然后有一些方法可以根据给定的电话号码或姓名返回人员数组(例如使用 filter)。我们中的许多人都有多个带有相同电话号码的联系人(例如共享业务线或甚至拥有相同家庭电话的家庭)。我甚至有几个不同的人用同一个名字(例如,我有两个真正不同的“Paul Williams”)。你想怎么做就怎么做吧,这只是我的一点想法。 - Rob
3个回答

26
问题出在你的 ReaderWriterLock 上。你将 writeClosure 保存为属性,然后异步地调度一个调用该已保存属性的闭包。但是,如果在此期间另一个 exclusiveWrite 进来,你的 writeClosure 属性将被替换为新的闭包。
在这种情况下,这意味着你可能会多次添加相同的 Person。由于你使用了字典,这些重复项具有相同的键,因此不会导致你看到所有 1000 个条目。
实际上,你可以简化 ReaderWriterLock,完全消除那个属性。我还会将 concurrentRead 设为通用型,返回值(就像 sync 一样),并重新抛出任何错误(如果有)。
public class ReaderWriterLock {
    private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
    
    public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
        return try queue.sync {
            try block()
        }
    }
    
    public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
        queue.async(flags: .barrier) {
            block()
        }
    }
}

一些其他无关的观察:
  1. By the way, this simplified ReaderWriterLock happens to solves another concern. That writeClosure property, which we've now removed, could have easily introduced a strong reference cycle.

    Yes, you were scrupulous about using [weak self], so there wasn't any strong reference cycle, but it was possible. I would advise that wherever you employ a closure property, that you set that closure property to nil when you're done with it, so any strong references that closure may have accidentally entailed will be resolved. That way a persistent strong reference cycle is never possible. (Plus, the closure itself and any local variables or other external references it has will be resolved.)

  2. You're sleeping for 10 seconds. That should be more than enough, but I'd advise against just adding random sleep calls (because you never can be 100% sure). Fortunately, you have a concurrent queue, so you can use that:

    concurrentTestQueue.async(flags: .barrier) { 
        print(phoneBook.count) 
    }
    

    Because of that barrier, it will wait until everything else you put on that queue is done.

  3. Note, I did not just print nameToPersonMap.count. This array has been carefully synchronized within PhoneBook, so you can't just let random, external classes access it directly without synchronization.

    Whenever you have some property which you're synchronizing internally, it should be private and then create a thread-safe function/variable to retrieve whatever you need:

    public class PhoneBook {
    
        private var nameToPersonMap = [String: Person]()
        private var phoneNumberToPersonMap = [String: Person]()
    
        ...
    
        var count: Int {
            return readWriteLock.concurrentlyRead {
                nameToPersonMap.count
            }
        }
    }
    
  4. You say you're testing thread safety, but then created PhoneBook with .none option (achieving no thread-safety). In that scenario, I'd expect problems. You have to create your PhoneBook with the .threadSafe option.

  5. You have a number of strongSelf patterns. That's rather unswifty. It is generally not needed in Swift as you can use [weak self] and then just do optional chaining.

把所有这些东西结合在一起,这是我的最终游乐场:
PlaygroundPage.current.needsIndefiniteExecution = true

public class Person {
    public let name: String
    public let phoneNumber: String
    
    public init(name: String, phoneNumber: String) {
        self.name = name
        self.phoneNumber = phoneNumber
    }
    
    public static func uniquePerson() -> Person {
        let randomID = UUID().uuidString
        return Person(name: randomID, phoneNumber: randomID)
    }
}

extension Person: CustomStringConvertible {
    public var description: String {
        return "Person: \(name), \(phoneNumber)"
    }
}

public enum ThreadSafety { // Changed the name from Qos, because this has nothing to do with quality of service, but is just a question of thread safety
    case threadSafe, none
}

public class PhoneBook {
    
    private var threadSafety: ThreadSafety
    private var nameToPersonMap = [String: Person]()        // if you're synchronizing these, you really shouldn't expose them to the public
    private var phoneNumberToPersonMap = [String: Person]() // if you're synchronizing these, you really shouldn't expose them to the public
    private var readWriteLock = ReaderWriterLock()
    
    public init(_ threadSafety: ThreadSafety) {
        self.threadSafety = threadSafety
    }
    
    public func personByName(_ name: String) -> Person? {
        if threadSafety == .threadSafe {
            return readWriteLock.concurrentlyRead { [weak self] in
                self?.nameToPersonMap[name]
            }
        } else {
            return nameToPersonMap[name]
        }
    }
    
    public func personByPhoneNumber(_ phoneNumber: String) -> Person? {
        if threadSafety == .threadSafe {
            return readWriteLock.concurrentlyRead { [weak self] in
                self?.phoneNumberToPersonMap[phoneNumber]
            }
        } else {
            return phoneNumberToPersonMap[phoneNumber]
        }
    }
    
    public func addPerson(_ person: Person) {
        if threadSafety == .threadSafe {
            readWriteLock.exclusivelyWrite { [weak self] in
                self?.nameToPersonMap[person.name] = person
                self?.phoneNumberToPersonMap[person.phoneNumber] = person
            }
        } else {
            nameToPersonMap[person.name] = person
            phoneNumberToPersonMap[person.phoneNumber] = person
        }
    }
    
    var count: Int {
        return readWriteLock.concurrentlyRead {
            nameToPersonMap.count
        }
    }
}

// A ReaderWriterLock implemented using GCD concurrent queue and barriers.

public class ReaderWriterLock {
    private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
    
    public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
        return try queue.sync {
            try block()
        }
    }
    
    public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
        queue.async(flags: .barrier) {
            block()
        }
    }
}


for _ in 0 ..< 5 {
    let iterations = 1000
    let phoneBook = PhoneBook(.threadSafe)
    
    let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: .concurrent)
    for _ in 0..<iterations {
        let person = Person.uniquePerson()
        concurrentTestQueue.async {
            phoneBook.addPerson(person)
        }
    }
    
    concurrentTestQueue.async(flags: .barrier) {
        print(phoneBook.count)
    }
}

个人而言,我更倾向于进一步将同步移入通用类中;并且

  • 将模型更改为Person对象的数组,以便:
    • 模型支持多个人使用相同的电话号码;和
    • 如果需要,可以使用值类型。

例如:

public struct Person {
    public let name: String
    public let phoneNumber: String
    
    public static func uniquePerson() -> Person {
        return Person(name: UUID().uuidString, phoneNumber: UUID().uuidString)
    }
}

public struct PhoneBook {
    
    private var synchronizedPeople = Synchronized([Person]())
    
    public func people(name: String? = nil, phone: String? = nil) -> [Person]? {
        return synchronizedPeople.value.filter {
            (name == nil || $0.name == name) && (phone == nil || $0.phoneNumber == phone)
        }
    }
    
    public func append(_ person: Person) {
        synchronizedPeople.writer { people in
            people.append(person)
        }
    }
    
    public var count: Int {
        return synchronizedPeople.reader { $0.count }
    }
}

/// A structure to provide thread-safe access to some underlying object using reader-writer pattern.

public class Synchronized<T> {
    /// Private value. Use `public` `value` computed property (or `reader` and `writer` methods)
    /// for safe, thread-safe access to this underlying value.
    
    private var _value: T
    
    /// Private reader-write synchronization queue
    
    private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".synchronized", qos: .default, attributes: .concurrent)
    
    /// Create `Synchronized` object
    ///
    /// - Parameter value: The initial value to be synchronized.
    
    public init(_ value: T) {
        _value = value
    }
    
    /// A threadsafe variable to set and get the underlying object, as a convenience when higher level synchronization is not needed        
    
    public var value: T {
        get { reader { $0 } }
        set { writer { $0 = newValue } }
    }
    
    /// A "reader" method to allow thread-safe, read-only concurrent access to the underlying object.
    ///
    /// - Warning: If the underlying object is a reference type, you are responsible for making sure you
    ///            do not mutating anything. If you stick with value types (`struct` or primitive types),
    ///            this will be enforced for you.
    
    public func reader<U>(_ block: (T) throws -> U) rethrows -> U {
        return try queue.sync { try block(_value) }
    }
    
    /// A "writer" method to allow thread-safe write with barrier to the underlying object
    
    func writer(_ block: @escaping (inout T) -> Void) {
        queue.async(flags: .barrier) {
            block(&self._value)
        }
    }
}

2
简直是惊人的答案。非常感谢,这为我解释了很多事情...老实说,对于我没有看到闭包问题,这是一个新手错误:P再次感谢。 - AyBayBay
2
@coping - 是的,因为由于writer:你不能在执行异步变异时在struct中拥有mutating方法。 - Rob
@rob:啊,那就说得通了。谢谢你的澄清。 - coping
1
@Kiran 是的。有时候你只需要一个同步访问器方法,这样访问器会产生更自然的代码。例如,也许你只是重置了一个同步整数。foo.value = 0foo.writer { $0 = 0 }foo.writer { value in value = 0 } 更自然。但是假设我要增加这个值,我不能使用访问器(因为增加是一个多步操作,需要读取值、增加它并写入),而且 foo.value += 1 不是线程安全的,尽管看起来很简单。你必须使用 foo.writer { $0 += 1 } 或等效的方法。 - Rob
1
但是如果你发现同时提供两者会令人困惑,那就去掉计算属性并失去它更简洁的语法。(但是在我看来,你不应该去掉方法,因为往往情况下,你确实需要更高级别的同步来实现线程安全。) - Rob
显示剩余3条评论

0
在某些情况下,您可能会使用NSCache类。文档声称它是线程安全的:
您可以从不同的线程向缓存中添加、删除和查询项目,而无需自己锁定缓存。
这里有一篇文章描述了与NSCache相关的非常有用的技巧。

-3

我认为你没有使用错误 :).

在 macOS 上的原始版本生成:

0  swift                    0x000000010c9c536a PrintStackTraceSignalHandler(void*) + 42
1  swift                    0x000000010c9c47a6 SignalHandler(int) + 662
2  libsystem_platform.dylib 0x00007fffbbdadb3a _sigtramp + 26
3  libsystem_platform.dylib 000000000000000000 _sigtramp + 1143284960
4  libswiftCore.dylib       0x0000000112696944 _T0SSwcp + 36
5  libswiftCore.dylib       0x000000011245fa92 _T0s24_VariantDictionaryBufferO018ensureUniqueNativeC0Sb11reallocated_Sb15capacityChangedtSiF + 1634
6  libswiftCore.dylib       0x0000000112461fd2 _T0s24_VariantDictionaryBufferO17nativeUpdateValueq_Sgq__x6forKeytF + 1074

如果您从ReaderWriter队列中删除“.concurrent”,“问题就会消失”。© 如果您恢复了.concurrent,但将写入器端的异步调用更改为同步:

swift(10504,0x70000896f000)malloc:***错误对象0x7fcaa440cee8:释放对象的校验和不正确-释放后可能修改了对象。

如果这不是Swift,那将有点令人惊讶? 我深入挖掘,通过插入哈希函数将您基于“字符串”的数组替换为基于Int的数组,将sleep(10)替换为屏障分派以刷新任何落后的块,并使其更可重现地崩溃,其中包含更有帮助的:

x(10534,0x700000f01000)malloc:***错误对象0x7f8c9ee00008:释放对象的校验和不正确-释放后可能修改了对象。

但是,当源代码搜索未显示malloc或free时,堆栈转储可能更有用。

无论如何,解决您的问题的最佳方法是:改用go;它实际上是有意义的。


去掉“.concurrent”就可以解决问题,因为这样您就可以利用串行队列。串行队列默认情况下不需要使用读写锁,因为线程一个接一个地顺序执行,这就是它能正常工作的原因。此外,您不能使用栅栏同步调度,因为API不支持,这也是您遇到崩溃的原因。我不理解您提出的解决方案,仅告诉我使用“Go”是不够的。这个问题的目的是帮助我在Swift中解决这个问题。 - AyBayBay
1
我认为你的初衷是好的,但不幸的是,人们已经给你投了反对票。我也不得不这样做。 - AyBayBay
1
删除.concurrent不仅会使其不再是读写器(从而失去了这种优势),而且实际上也无法解决根本问题。采用原始代码,将队列串行化,问题仍将存在。问题在于异步调用的闭包属性。我敢打赌,在整理他的代码时,您删除了那个不必要的属性,这才真正消除了问题。 - Rob

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接