并行LINQ - 返回第一个返回的结果

4
我正在使用PLINQ运行一个测试串口,以确定它们是否是GPS设备的函数。
有些串口立即被发现是有效的GPS。在这种情况下,我希望完成测试的第一个串口是返回的。我不想等待其他结果。
我能用PLINQ来做到这一点吗,还是我必须安排一批任务并等待其中一个返回?

我正要发布这个问题,然后发现了这个。不幸的是,被接受的答案是不正确的。在4.0中这不可能吗?(我可以看到在4.5中是可能的)。 - bj0
4个回答

7

在这里,PLINQ 可能不太够用。虽然你可以使用 .First,但在 .NET 4 中,这将导致顺序运行,这与初衷相悖。(请注意,这个问题在 .NET 4.5 中得到了改善。)

然而,TPL 很可能是正确的答案。你可以为每个串口创建一个 Task<Location>,然后使用 Task.WaitAny 等待第一个成功的操作。

这提供了一种简单的方法来安排一堆“任务”,然后只使用第一个结果。


问题在于现在我需要为每个任务等待WaitAny,查看结果,如果是正面的结果则取消剩余的任务,如果是负面的则继续等待,同时仍要小心,在没有任务时不要等待。对于这样一个看似简单的任务来说,这是相当复杂的逻辑;难道没有更好的方法吗? - David Pfeffer
@DavidPfeffer 你可以将结果放入 BlockingCollection<T> 中,并让一个线程调用它的 GetConsumingEnumerable... 一旦获得“有效”的结果,触发取消令牌并使用它。 - Reed Copsey
如果没有任何端口是有效的,该怎么办? - David Pfeffer
@DavidPfeffer 当你到达结尾时,如果你什么都没找到,那么你就知道没有任何有效的东西了... - Reed Copsey

2

过去几天我一直在思考这个问题,但是我无法找到C# 4.0中内置的PLINQ方法来解决。这个问题的被接受答案使用FirstOrDefault,但它只有当完整的PLINQ查询完成后才会返回值,并且仍然返回(有序的)第一个结果。以下极端示例显示了此行为:

var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
    .WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
    .Where(i => i % 2 == 0 )
    .Select( i =>
    {
        if( i == 0 )
            Thread.Sleep(3000);
        else
            Thread.Sleep(rnd.Value.Next(50, 100));
        return string.Format("dat {0}", i).Dump();
    });

cts.CancelAfter(5000);

// waits until all results are in, then returns first
q.FirstOrDefault().Dump("result");

我没有看到立即获取第一个可用结果的内置方法,但我想出了两种解决方法。
第一种方法是创建任务来完成工作并返回任务,这将导致快速完成PLINQ查询。生成的任务可以传递给WaitAny,以在第一个结果可用时立即获取它。
var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
    .WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
    .Where(i => i % 2 == 0 )
    .Select( i =>
    {
        return Task.Factory.StartNew(() =>
        {
        if( i == 0 )
            Thread.Sleep(3000);
        else
            Thread.Sleep(rnd.Value.Next(50, 100));
        return string.Format("dat {0}", i).Dump();
        });
    });

cts.CancelAfter(5000);

// returns as soon as the tasks are created
var ts = q.ToArray();

// wait till the first task finishes
var idx = Task.WaitAny( ts );
ts[idx].Result.Dump("res");

这可能是一种糟糕的方法。由于PLINQ查询的实际工作只是非常快速的Task.Factory.StartNew,因此使用PLINQ毫无意义。在IEnumerable上简单地使用.Select(i => Task.Factory.StartNew(... 更加简洁且可能更快。

第二种解决方法使用队列(BlockingCollection),并在计算完成后将结果插入该队列:

var cts = new CancellationTokenSource();
var rnd = new ThreadLocal<Random>(() => new Random());

var q = Enumerable.Range(0, 11).Select(x => x).AsParallel()
    .WithCancellation(cts.Token).WithMergeOptions( ParallelMergeOptions.NotBuffered).WithDegreeOfParallelism(10).AsUnordered()
    .Where(i => i % 2 == 0 )
    .Select( i =>
    {
        if( i == 0 )
            Thread.Sleep(3000);
        else
            Thread.Sleep(rnd.Value.Next(50, 100));
        return string.Format("dat {0}", i).Dump();
    });

cts.CancelAfter(5000);

var qu = new BlockingCollection<string>();

// ForAll blocks until PLINQ query is complete
Task.Factory.StartNew(() => q.ForAll( x => qu.Add(x) ));

// get first result asap
qu.Take().Dump("result");

使用此方法,使用PLINQ完成工作,并且BlockingCollection的Take()将在PLINQ查询插入第一个结果时立即返回。虽然这会产生所需的结果,但我不确定它是否比仅使用更简单的Tasks + WaitAny有任何优势。

0
要在.NET 4.0中完全使用PLINQ来完成这个任务:
SerialPorts.                        // Your IEnumerable of serial ports
    AsParallel().AsUnordered().     // Run as an unordered parallel query
    Where(IsGps).                   // Matching the predicate IsGps (Func<SerialPort, bool>)
    Take(1).                        // Taking the first match
    FirstOrDefault();               // And unwrap it from the IEnumerable (or null if none are found

关键是在指定只需要找到一个之前,不要使用类似于First或FirstOrDefault的有序评估。

0

2
我没有看到这种行为。即使使用 AsUnordered,在 PLINQ 查询中,FirstOrDefault 仍然返回第一个输入的结果。 - bj0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接