在Swift中的多线程函数式编程

7

最近我在使用Swift 2.1 操作字节数组,我经常会写出以下类似的代码:

// code to add functions to a [UInt8] object
extension CollectionType where Generator.Element == UInt8 {

    func xor(with byte: UInt8) -> [UInt8] {
        return map { $0 ^ byte }
    }
}

// example usage: [67, 108].xor(with: 0) == [67, 108]

有没有一种简单的方法可以并行化这个map调用,以便多个线程可以同时操作数组的非重叠区域?
我可以编写代码手动将数组分成子数组,并在不同的线程上调用map。但我想知道Swift是否存在某些框架可以自动进行分割,因为map是一个可以在无副作用的线程安全环境中工作的函数调用。
澄清说明: 1. 代码只需要在[UInt8]对象上工作,不一定是每个CollectionType
3个回答

8
执行并行计算的最简单方法是使用concurrentPerform(以前称为dispatch_apply;请参阅Concurrency Programming Guide中的Performing Loop Iterations Concurrently)。但是,没有map函数可以为您完成此操作。您必须自己完成。

例如,您可以编写一个扩展程序来执行并发任务:

extension Array {
    public func concurrentMap<T>(_ transform: (Element) -> T) -> [T] {
        var results = [Int: T](minimumCapacity: count)

        let lock = NSLock()

        DispatchQueue.concurrentPerform(iterations: count) { index in
            let result = transform(self[index])
            lock.synchronized {
                results[index] = result
            }
        }

        return (0 ..< results.count).compactMap { results[$0] }
    }
}

在哪里

extension NSLocking {
    func synchronized<T>(block: () throws -> T) rethrows -> T {
        lock()
        defer { unlock() }
        return try block()
    }
}

您可以使用任何同步机制(锁、串行队列、读写器),但是想法是并发执行transform,然后同步更新集合。
注意:
- 这将阻塞您从中调用它的线程(就像非并发的map一样),因此请确保将其调度到后台队列。 - 需要确保每个线程上有足够的工作来证明管理所有这些线程的固有开销是合理的。(例如,每次循环一个简单的异或调用是不够的,您会发现它实际上比非并发版本更慢。)在这些情况下,请确保跨越适当的步幅(请参见改善循环代码,平衡每个并发块的工作量)。例如,而不是执行5000次一种极其简单的操作,可以在每个循环中执行500次10次迭代。您可能需要尝试适当的步幅值。
虽然我怀疑您不需要这个讨论,但对于不熟悉concurrentPerform(以前称为dispatch_apply)的读者,我将在下面说明其用法。有关该主题的更完整讨论,请参见上面的链接。
例如,让我们考虑比简单的xor复杂得多的东西(因为对于这样简单的东西,开销超过了任何性能收益),例如一个天真的斐波那契实现:
func fibonacci(_ n: Int) -> Int {
    if n == 0 || n == 1 {
        return n
    }
    return fibonacci(n - 1) + fibonacci(n - 2)
}

如果你有一个 Int 值的数组需要计算,而不是:

let results = array.map { fibonacci($0) }

您可以:
var results = [Int](count: array.count, repeatedValue: 0)
DispatchQueue.concurrentPerform(iterations: array.count) { index in
    let result = self.fibonacci(array[index])
    synchronize.update { results[index] = result }      // use whatever synchronization mechanism you want
}

或者,如果你想要一个功能性的版本,你可以使用我上面定义的那个 扩展

let results = array.concurrentMap { fibonacci($0) }

对于Swift 2版本,请参见此回答的先前版本


就像我在现已删除的答案中所说的那样,我一直在进行一些基准测试。如果您将数组分成x个切片,其中x = NSProcessInfo().activeProcessorCount * 4,则速度会稍微快一些。每个切片都有自己的dispatch_apply - R Menke
这个实现在我将其插入我的测试框架时会捕获解包空可选项,即使修复了这个问题,它也非常低效,因为它为每个元素排队一个异步操作。 - Dave Abrahams
如果您的计算量对于concurrentPerform的每次迭代来说不够大,那么您需要使用stride。请参考https://stackoverflow.com/a/39949292/1271826以获取stride的示例。 - Rob

3

我的实现似乎是正确的,并且与我见过的所有其他实现相比表现良好。测试和基准测试在这里

extension RandomAccessCollection {
    /// Returns `self.map(transform)`, computed in parallel.
    ///
    /// - Requires: `transform` is safe to call from multiple threads.
    func concurrentMap<B>(_ transform: (Element) -> B) -> [B] {
        let batchSize = 4096 // Tune this
        let n = self.count
        let batchCount = (n + batchSize - 1) / batchSize
        if batchCount < 2 { return self.map(transform) }

        return Array(unsafeUninitializedCapacity: n) {
            uninitializedMemory, resultCount in
            resultCount = n
            let baseAddress = uninitializedMemory.baseAddress!

            DispatchQueue.concurrentPerform(iterations: batchCount) { b in
                let startOffset = b * n / batchCount
                let endOffset = (b + 1) * n / batchCount
                var sourceIndex = index(self.startIndex, offsetBy: startOffset)
                for p in baseAddress+startOffset..<baseAddress+endOffset {
                    p.initialize(to: transform(self[sourceIndex]))
                    formIndex(after: &sourceIndex)
                }
            }
        }
    }
}

希望这有所帮助, -Dave

你链接的要点看起来非常有趣,谢谢! - Sentry.co

-1

你可以使用parMap(),它是并行映射。你可以使用活动监视器来检查是否为并行映射。

func map<T: Collection, U>( _ transform: (T.Iterator.Element) -> U, _ xs: T) -> [U] {
    return xs.reduce([U](), {$0 + [transform($1)]})
}

public func parMap<T,U>(_ transform: @escaping (T)->U, _ xs: [T]) -> [U] {
    let len     = xs.count
    var results = [U?](repeating: nil, count: len)
    let process = { (i: Int) -> Void in results[i] = transform(xs[i]) }
    DispatchQueue.concurrentPerform(iterations: len, execute: process)
    return map({$0!}, results)
}

func test() {
    parMap({_ in Array(1...10000000).reduce(0,+)}, Array(1...10))
}

enter image description here


2
这个程序存在结果竞争条件。 - Dave Abrahams

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接