如何报告 PLINQ 查询的进度?

3

我希望能够报告长时间运行的PLINQ查询的进度。

我无法找到任何本地的LINQ方法来实现这一点(就像取消一样实现)。

我阅读了这篇文章,其中展示了一个适用于常规序列化查询的简洁扩展函数。

我一直在使用下面的代码进行测试行为。

var progress = new BehaviorSubject<int>(0);
DateTime start = DateTime.Now;
progress.Subscribe(x => { Console.WriteLine(x); });
Enumerable.Range(1,1000000)
    //.WithProgressReporting(i => progress.OnNext(i)) //Beginning Progress
    .AsParallel()
    .AsOrdered()
    //.WithProgressReporting(i => progress.OnNext(i)) //Middle Progress reporting
    .Select(v => { Thread.Sleep(1); return v * v; })
    //.WithProgressReporting(i => progress.OnNext(i)) //End Progress Reporting
    .ToList();
Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");

编辑:
使用 IEnumerable<T> 扩展从中间报告进度会移除并行性。
结尾报告不会在并行计算正在进行时报告任何进度,然后在最后非常快地报告所有进度。我认为这是将并行计算的结果编译成列表的进度。
我最初认为从开始报告进度会导致LINQ未并行运行。经过一夜思考和阅读Peter Duniho的评论后,我发现它实际上是并行工作的,但我得到了如此多的进度报告,处理这么多进度报告会显著减慢我的测试/应用程序。
有没有一种并行/线程安全的方法以增量方式报告PLINQ的进度,以便用户知道正在取得进展,而对方法运行时间影响不大?

你的问题不是很清楚。为什么非并行的WithProgressReporting()方法不能满足你的需求?通常,你会从IEnumerable<T>开始...只需将源IEnumerable<T>包装在调用WithProgressReporting()的语句中,并在其中调用AsParallel(),就像你在测试中所做的那样。最终吞吐量将是相同的,无论你在源或结果上报告进度。你需要更具体:发布一个[mcve]并精确地解释你期望的输出以及你实际得到的输出。 - Peter Duniho
2个回答

1

这个答案可能不够优雅,但它能完成任务。

使用PLINQ时,有多个线程处理您的集合,因此使用这些线程来报告进度会导致多个(并且顺序不正确的)进度报告,例如0%1%5%4%3%等。

相反,我们可以使用这些多个线程来更新存储进度的共享变量。在我的示例中,它是一个名为completed的本地变量。然后,我们使用Task.Run()生成另一个线程以在0.5秒间隔内报告该进度变量。

扩展类:

static class Extensions
    public static ParallelQuery<T> WithProgressReporting<T>(this ParallelQuery<T> sequence, Action increment)
    {
        return sequence.Select(x =>
        {
            increment?.Invoke();
            return x;
        });
    }
}

代码:

static void Main(string[] args)
    {
        long completed = 0;
        Task.Run(() =>
        {
            while (completed < 100000)
            {
                Console.WriteLine((completed * 100 / 100000) + "%");
                Thread.Sleep(500);
            }
        });
        DateTime start = DateTime.Now;
        var output = Enumerable.Range(1, 100000)
            .AsParallel()
            .WithProgressReporting(()=>Interlocked.Increment(ref completed))
            .Select(v => { Thread.Sleep(1); return v * v; })
            .ToList();
        Console.WriteLine("Completed in: " + (DateTime.Now - start).TotalSeconds + " seconds");
        Console.ReadKey();
    }

1

这个扩展可以放置在LINQ查询的开头或结尾。如果放置在开头,将立即开始报告进度,但会在工作完成之前错误地报告100%。如果放置在结尾,将准确报告查询的结束,但会延迟报告进度,直到源的第一项完成。

public static ParallelQuery<TSource> WithProgressReporting<TSource>(
    this ParallelQuery<TSource> source,
    long itemsCount, IProgress<double> progress)
{
    int countShared = 0;
    return source.Select(item =>
    {
        int countLocal = Interlocked.Increment(ref countShared);
        progress.Report(countLocal / (double)itemsCount);
        return item;
    });
}

使用示例:

// The Progress captures the current SynchronizationContext at construction.
var progress = new Progress<double>(); 
progress.ProgressChanged += (object sender, double e) =>
{
    Console.WriteLine($"Progress: {e:0%}");
};

var numbers = Enumerable.Range(1, 10);

var sum = numbers
    .AsParallel()
    .WithDegreeOfParallelism(3)
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(n => { Thread.Sleep(500); return n; }) // Simulate some heavy computation
    .WithProgressReporting(10, progress) // <--- the extension method
    .Sum();

Console.WriteLine($"Sum: {sum}");

输出:

Query output

有时候工作线程会抢占彼此,导致一些跳跃。

System.Progress<T> 类具有一个很好的特性,它在捕获的上下文(通常是 UI 线程)中调用 ProgressChanged 事件,因此可以安全地更新 UI 控件。另一方面,在控制台应用程序中,该事件在 ThreadPool 上被调用,而 ThreadPool 可能会被并行查询完全利用,因此事件将会有一些延迟(ThreadPool 每 500 毫秒生成新线程)。这就是我在示例中将并行度限制为 3 的原因,以保留一个空闲线程用于进度报告(我有一台四核机器)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接