Task Parallel Library指定结果的WaitAny

13

我试图编写一些代码,以便对多个不同的服务器进行并行网络服务调用,所以TPL似乎是明显的选择。

我的多个网络服务调用中只有一个会返回我想要的结果,其他所有调用都不会。我正在尝试有效地使用Task.WaitAny,但仅在符合条件的第一个Task 返回时解除阻塞。

我尝试过使用WaitAny,但无法确定如何放置筛选器。我完成了这一步:

public void SearchServers()
{
    var servers = new[] {"server1", "server2", "server3", "server4"};
    var tasks = servers
                 .Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s))
                 .ToArray();

    Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"?

    //Omitted: cancel any outstanding tasks since the correct server has been found
}

private bool CallServer(string server)
{
    //... make the call to the server and return the result ...
}

编辑:为了避免上面的内容引起任何混淆,我想做以下工作:

  1. 对于每个服务器,启动一个Task来检查它
  2. 等待直到有一个服务器返回true(最多只有一个服务器会返回true)
  3. 或者等待直到所有服务器都返回false,即没有匹配项。
4个回答

10
我能想到最好的方法是为每个任务指定一个ContinueWith,检查结果,如果为true,则取消其他任务。要取消任务,您可能需要使用CancellationToken
var tasks = servers
    .Select(s => Task.Run(...)
        .ContinueWith(t =>
            if (t.Result) {
                // cancel other threads
            }
        )
    ).ToArray();

更新:另一种解决方案是使用WaitAny直到正确的任务完成(但它有一些缺点,例如从列表中删除已完成的任务并从剩余的任务创建一个新数组是相当繁重的操作):

List<Task<bool>> tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => CallServer((string)server), s)).ToList();

bool result;
do {
    int idx = Task.WaitAny(tasks.ToArray());
    result = tasks[idx].Result;
    tasks.RemoveAt(idx);
} while (!result && tasks.Count > 0);

// cancel other tasks

更新2:如今我会使用Rx来实现:

[Fact]
public async Task AwaitFirst()
{
    var servers = new[] { "server1", "server2", "server3", "server4" };
    var server = await servers
        .Select(s => Observable
            .FromAsync(ct => CallServer(s, ct))
            .Where(p => p)
            .Select(_ => s)
        )
        .Merge()
        .FirstAsync();
    output.WriteLine($"Got result from {server}");
}

private async Task<bool> CallServer(string server, CancellationToken ct)
{
    try
    {
        if (server == "server1")
        {
            await Task.Delay(TimeSpan.FromSeconds(1), ct);
            output.WriteLine($"{server} finished");
            return false;
        }
        if (server == "server2")
        {
            await Task.Delay(TimeSpan.FromSeconds(2), ct);
            output.WriteLine($"{server} finished");
            return false;
        }
        if (server == "server3")
        {
            await Task.Delay(TimeSpan.FromSeconds(3), ct);
            output.WriteLine($"{server} finished");
            return true;
        }
        if (server == "server4")
        {
            await Task.Delay(TimeSpan.FromSeconds(4), ct);
            output.WriteLine($"{server} finished");
            return true;
        }
    }
    catch(OperationCanceledException)
    {
        output.WriteLine($"{server} Cancelled");
        throw;
    }

    throw new ArgumentOutOfRangeException(nameof(server));
}

在我的机器上,测试用时3.32秒(这意味着它没有等待第四个服务器),并且我得到了以下输出:

server1 finished
server2 finished
server3 finished
server4 Cancelled
Got result from server3

我尝试了你的代码示例,但它似乎不完全符合我的要求。我不能使用WaitAny,因为它只在第一个Task完成时返回,即使服务器不是正确的。我也不能使用WaitAll,否则我必须等待所有任务完成,即使我已经找到了正确的任务。理想情况下,我希望做到“等待直到其中一个任务返回true或所有任务完成(即没有任何服务器匹配)”。有什么办法可以实现这一点吗? - Adam Rodger
我更新了我的回答,但现在我更喜欢@svick的回答。 - Johannes Egger
标记为答案,因为我不能使用@svick的答案,尽管它看起来非常好,因为我在.NET 4.0上。 - Adam Rodger
1
我又更新了我的答案,Rx 真是太棒了 :-) - Johannes Egger

4
您可以使用AsyncEx库中的OrderByCompletion()方法,该方法会在任务完成时返回它们。您的代码可能如下所示:
var tasks = servers
    .Select(s => Task.Factory.StartNew(server => CallServer((string)server), s))
    .OrderByCompletion();

foreach (var task in tasks)
{
    if (task.Result)
    {
        Console.WriteLine("found");
        break;
    }
    Console.WriteLine("not found yet");
}

// cancel any outstanding tasks since the correct server has been found

1
“task.Result” 是否会阻塞当前线程直到下一个任务完成?如果当前线程是 UI 线程,那么是否应该在另一个线程上执行该操作? - Johannes Egger
1
@Jasd 是的,它可以。但问题要求改进WaitAny(),同时也要阻塞。因此我假设这不是一个UI应用程序,或者它已经在单独的线程上运行。 - svick
看起来很整洁,但不幸的是我正在使用VS2010,所以我不能使用那个库。 - Adam Rodger
1
在这种情况下,您可以使用Jon Skeet的该方法代码,或者Stephen Toub的Interleaved(),后者略有不同(它返回IEnumerable<Task<Task>>)。 - svick
1
Jon Skeet的代码已经移动到这里 - Daniel Lidström

1
这里是基于svick答案的通用解决方案:
public static async Task<T> GetFirstResult<T>(
this IEnumerable<Func<CancellationToken, Task<T>>> taskFactories, 
Action<Exception> exceptionHandler,
Predicate<T> predicate)
{
    T ret = default(T);
    var cts = new CancellationTokenSource();
    var proxified = taskFactories.Select(tf => tf(cts.Token)).ProxifyByCompletion();
    int i;
    for (i = 0; i < proxified.Length; i++)
    {
        try
        {
            ret = await proxified[i].ConfigureAwait(false);
        }
        catch (Exception e)
        {
            exceptionHandler(e);
            continue;
        }
        if (predicate(ret))
        {
            break;
        }
    }

    if (i == proxified.Length)
    {
        throw new InvalidOperationException("No task returned the expected value");
    }
    cts.Cancel(); //we have our value, so we can cancel the rest of the tasks
    for (int j = i+1; j < proxified.Length; j++)
    {
        //observe remaining tasks to prevent process crash 
        proxified[j].ContinueWith(
         t => exceptionHandler(t.Exception), TaskContinuationOptions.OnlyOnFaulted)
                   .Forget();
    }
    return ret;
}

其中 ProxifyByCompletion 的实现如下:

public static Task<T>[] ProxifyByCompletion<T>(this IEnumerable<Task<T>> tasks)
{
    var inputTasks = tasks.ToArray();
    var buckets = new TaskCompletionSource<T>[inputTasks.Length];
    var results = new Task<T>[inputTasks.Length];
    for (int i = 0; i < buckets.Length; i++)
    {
        buckets[i] = new TaskCompletionSource<T>();
        results[i] = buckets[i].Task;
    }
    int nextTaskIndex = -1;
    foreach (var inputTask in inputTasks)
    {
        inputTask.ContinueWith(completed =>
        {
            var bucket = buckets[Interlocked.Increment(ref nextTaskIndex)];
            if (completed.IsFaulted)
            {
                Trace.Assert(completed.Exception != null);
                bucket.TrySetException(completed.Exception.InnerExceptions);
            }
            else if (completed.IsCanceled)
            {
                bucket.TrySetCanceled();
            }
            else
            {
                bucket.TrySetResult(completed.Result);
            }
        }, CancellationToken.None, 
           TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }
    return results;
}

"

Forget 是一个空方法,用于抑制 CS4014 错误:

"
public static void Forget(this Task task) //suppress CS4014
{
}

1
使用Interlocked.CompareExchange可以做到这一点,只有一个任务能够写入serverReturnedData。
    public void SearchServers()
        {
            ResultClass serverReturnedData = null;
            var servers = new[] {"server1", "server2", "server3", "server4"};
            var tasks = servers.Select(s => Task<bool>.Factory.StartNew(server => 
            {
               var result = CallServer((string)server), s);
               Interlocked.CompareExchange(ref serverReturnedData, result, null);

            }).ToArray();

            Task.WaitAny(tasks); //how do I say "WaitAny where the result is true"?
        //
        // use serverReturnedData as you want.
        // 
        }

编辑:如Jasd所言,上述代码可能在变量serverReturnedData获得有效值之前返回(如果服务器返回空值,则可能会发生这种情况),为确保您可以将结果包装在自定义对象中。


Task.WaitAny(tasks); 之后,serverReturnedData 仍然可能会改变(因为其他任务将完成)。此外,第一个完成的任务不一定是返回 true 的任务。 - Johannes Egger
以上代码保证了如果第一个返回的任务返回空值,那么第一个非空值将被存储在本地变量中,是的,它之后可能会改变。但这可以通过将结果包装在自定义对象中轻松解决。 - DVD
好的。首先,bool是值类型,无法为null,因此您可能需要将初始状态和“Interlocked.CompareExchange”的第三个参数更改为false。此外,完成的第一个任务(在“Task.WaitAny(tasks);”中等待的任务)不能保证是返回true的任务。但是,我认为OP想要等待返回true的第一个任务。 - Johannes Egger
真的,我改了我的例子,没有注意到bool声明xD。第二个可能是我在阅读问题时的误解。 - DVD

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接