如何在 PowerShell 的 ForEach-Parallel 循环中添加原子计数器

5
在这个问题中,解释了如何向并发的ThreadSafe集合Powershell: How to add Result to an Array (ForEach-Object -Parallel)添加内容。
我有一个更简单的用例,只是想增加一个单一的值(整数)。
在Powershell中是否可以使用某种原子整数数据类型来实现?
$myAtomicCounter = 0

$myItems | ForEach-Object -Parallel {
    #...other work

    $myAtomicCounter.ThreadSafeAdd(2)

    # .. some more work using counter
}

Write-Host($myAtomicCounter)

这是可能的,但也意味着在给定的时间只有一个线程可以更新该值(也就是说,在此期间其他线程被锁定)。你能解释一下为什么要这样做吗?可能有更好的方法。 - Santiago Squarzon
@SantiagoSquarzon 我想要追踪每个项目中所包含的值的总和。例如,如果我处理了item1、item3、item4,当我处理item2时,我希望手头有所有item1.val、item2.val和item4.val的总和可供使用。我可以构建一个作业哈希映射并扫描以查看哪些已完成,然后每次求和,但我更喜欢像这样使用计数器。 - Ryu S.
1
最简单的方法是将并行循环中处理过的计数输出到一个新的管道线性循环中,该循环将以线程安全的方式不断更新已处理项目的计数。 - Santiago Squarzon
1个回答

7
在PowerShell中,当从多个线程更新单个值时,必须使用锁定机制,例如MutexSemaphoreSlim甚至Monitor.Enter,否则更新操作将不是线程安全的同步哈希表不能确保更新键值的线程安全性
下面是一个简单的演示,证明了上述观点:
$sync = [hashtable]::Synchronized(@{ })
$attempts = 0

do {
    $sync['Value'] = 0
    $attempts++
    0..10 | ForEach-Object -Parallel {
        $sync = $using:sync
        Start-Sleep -Milliseconds 200
        $sync['Value']++
    } -ThrottleLimit 11
}
while ($sync['Value'] -eq 11)

"It took $attempts attempts to fail..."

假设我们有一个数组的数组:
$toProcess = 0..10 | ForEach-Object {
    , (Get-Random -Count (Get-Random -Minimum 5 -Maximum 10))
}

如果你想要跟踪每个数组中处理过的项目,可以使用Mutex来实现:
$processedItems = [hashtable]::Synchronized(@{
    Lock    = [System.Threading.Mutex]::new()
    Counter = 0
})

$toProcess | ForEach-Object -Parallel {
    # using sleep as to emulate doing something here
    Start-Sleep (Get-Random -Maximum 5)

    # bring the local variable to this scope
    $ref = $using:processedItems
    # lock this thread until I can write
    if($ref['Lock'].WaitOne()) {
        # when I can write, update the value
        $ref['Counter'] += $_.Count
        # and realease this lock so others threads can write
        $ref['Lock'].ReleaseMutex()
    }
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$processedItems['Counter'] -eq $processedCount

另一个使用Monitor.Enter和一个自定义函数来增加计数器的 tread safe 示例,试图模仿 C# 的lock语句
function lock {
    param(
        [Parameter(Mandatory)]
        [object] $Object,

        [Parameter(Mandatory)]
        [scriptblock] $ScriptBlock
    )

    try {
        [System.Threading.Monitor]::Enter($Object)
        & $ScriptBlock
    }
    finally {
        [System.Threading.Monitor]::Exit($Object)
    }
}

$utils = [hashtable]::Synchronized(@{
    LockFunc = $function:lock.ToString()
    Counter  = @(0)
})

$toProcess | ForEach-Object -Parallel {
    # bring the utils var to this scope
    $utils = $using:utils
    # define the `lock` function here
    $function:lock = $utils['LockFunc']

    Start-Sleep (Get-Random -Maximum 5)

    # lock the counter array
    lock($utils['Counter'].SyncRoot) {
        # increment and release when done
        $utils['Counter'][0] += $_.Count
    }
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$utils['Counter'][0] -eq $processedCount

在PowerShell中,一个更简单的方法是将并行循环的输出转换为线性循环,在这里您可以安全地更新计数器,而不需要担心线程安全性。
$counter = 0

$toProcess | ForEach-Object -Parallel {
    # using sleep as to emulate doing something here
    Start-Sleep (Get-Random -Maximum 5)

    # when this thread is done,
    # output this array of processed items
    $_
    
} | ForEach-Object {
    # then the output from the parallel loop is received in this linear
    # thread safe loop where we can update the counter
    $counter += $_.Count
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$counter -eq $processedCount

1
C# 的开发人员建议使用 Interlocked.Increment 作为计数器,您觉得如何? - zett42
2
mmm 在 PowerShell 中似乎无法很好地运行。@zett42 我没有得到一致的结果 $i = [ref] 0; 0..100 | ForEach-Object -Parallel { $i = $using:i; Start-Sleep (Get-Random -Maximum 4); $null = [System.Threading.Interlocked]::Increment($i) } 有时预计是101,而其他情况下则小于此数。 - Santiago Squarzon
1
我还没有在PowerShell中使用过 Interlocked.Increment。感谢您的测试!它不能可靠地工作,可能是因为PowerShell的 [ref] 不是真正的引用,而只是一个包装类的缘故。 - zett42
1
@Dennis AddOrUpdate可以根据其名称的意思,添加或更新一个键值对。然而,要增加单个值,您需要事先知道该值,并在其基础上加上+ 1。在增加之前需要锁定的原因很简单,2个或更多线程可以同时尝试读取该值,然后同时尝试更新该值,从而创建了竞争条件。锁定确保在给定的时间段内只有1个线程可以读取和更新该值。 - Santiago Squarzon
1
好的,现在我明白了。谢谢 :) - Dennis
显示剩余16条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接