有没有比"fire and forget"更好、更可靠的模式来同时处理异步任务的可变数量?

3
我有以下代码:
while (!cancellationToken.IsCancellationRequested)
{
    var connection = await listener.AcceptAsync(cancellationToken);

    HandleConnectionAsync(connection, cancellationToken)
        .FireAndForget(HandleException);
}

FireAndForget是一个扩展方法:

public static async void FireAndForget(this ValueTask task, Action<Exception> exceptionHandler)
{
    try
    {
        await task.ConfigureAwait(false);
    }
    catch (Exception e)
    {
        exceptionHandler.Invoke(e);
    }
}
while loop 是服务器生命周期。当新的连接被接受后,它会启动一些“后台任务”,以便处理这个新连接,然后while loop返回到接受新连接的状态而不需要等待任何东西 - 暂停生命周期。
我在这里不能使用await HandleConnectionAsync(暂停生命周期),因为我想立即接受另一个连接(如果有的话)并能够同时处理多个连接。HandleConnectionAsync是I/O绑定的,并且一次只处理一个连接,直到关闭(任务完成后的某个时间)。
连接必须分别处理 - 我不希望处理一个连接时出现任何错误影响其他连接的情况。
这里所述的“fire and forget”解决方案可行,但通常规则是始终await异步方法,永远不要使用async void
看起来我打破了规则,所以在这种情况下有更好、更可靠的方法来同时处理变量(任务数量随时间变化)数量的异步I/O绑定任务吗?
更多信息:
- 每次调用AcceptAsync都会分配系统资源,甚至在返回连接之前,我希望尽可能避免这种情况(连接可能不会被返回几个小时(代码可能会“等待”几个小时)- 直到某个外部客户端决定连接到我的服务器)。最好假设这是我不希望同时/并行调用的方法 - 一次只有一个AcceptAsync足够了。 - 请考虑到我每天可以有数百万个客户端连接和断开到我的服务器,服务器(while loop)可以工作很多天。 - 我不知道在特定时间需要处理多少个连接。 - 我知道我的程序能够同时处理的最大连接数。 - 如果达到了maximum number of connections限制,则AcceptAsync不会返回新连接,直到其他活动连接关闭,因此我不需要担心这个问题,但任何基于该限制的解决方案都必须考虑到活动连接可能关闭并且仍然需要处理新连接 - 连接数量随时间变化。 "fire and forget"对此没有问题。 - 关于HandleConnectionAsync的代码不相关-它只处理一个连接,直到关闭(任务完成后的某个时间),并且是I/O绑定的(HandleConnectionAsync一次只处理一个连接,但是我们当然可以启动多个HandleConnectionAsync任务来同时处理多个连接 - 这就是我用“fire and forget”做的)。

2
Task.WaitAll? - NotFound
@404 我想你的意思是 Task.WhenAll - 我不确定如何在这里应用它,因为我没有固定的任务集合。正如我在“更多信息”部分中提到的那样,我不知道特定时间的连接数量 - 它可以是一个、两个或1000个,并且可能会随时间变化。 - user6440521
你也可以使用 Task.WhenAll 来捆绑它们,但最终如果你不想“发射并忘记”,你必须在某个时刻等待它们完成。无论是使用捆绑任务还是 Task.WaitAll - NotFound
3个回答

3

我假设更改为类似SignalR的解决方案是不可接受的。这将是我的首要建议。

自定义服务器套接字是一种可以接受某种“fire and forget”(即无需返回结果)的场景。我考虑向AsyncEx添加“任务管理器”类型,以使此类解决方案更加容易,但尚未完成。

关键是您需要自行管理连接列表。 "连接"对象可以包含表示处理循环的Task;这是可以的。还有其他属性也很有用(特别是为了调试或管理目的),例如远程IP。

因此,我会采用以下方法:

private readonly object _mutex = new object();
private readonly List<State> _connections = new List<State>();

private void Add(State state)
{
  lock (_mutex)
    _connections.Add(state);
}
private void Remove(State state)
{
  lock (_mutex)
    _connections.Remove(state);
}

public async Task RunAsync(CancellationToken cancellationToken)
{
  while (true)
  {
    var connection = await listener.AcceptAsync(cancellationToken);

    Add(new State(this, connection));
  }
}

private sealed class State
{
  private readonly Parent _parent;

  public State(Parent parent, Connection connection, CancellationToken cancellationToken)
  {
    _parent = parent;
    Task = ExecuteAsync(connection, cancellationToken);
  }

  private static async Task ExecuteAsync(Connection connection, CancellationToken cancellationToken)
  {
    try { await HandleConnectionAsync(connection, cancellationToken); }
    finally { _parent.Remove(this); }
  }

  public Task Task { get; }
  // other properties as desired, e.g., RemoteAddress
}

现在您拥有了一组连接。您可以像上面的代码一样忽略State对象中的任务,就像“fire-and-forget”(发送并忘记)一样。或者您可以在某个时候await所有这些任务。例如:

public async Task RunAsync(CancellationToken cancellationToken)
{
  try
  {
    while (true)
    {
      var connection = await listener.AcceptAsync(cancellationToken);

      Add(new State(this, connection));
    }
  }
  catch (OperationCanceledException)
  {
    // Wait for all connections to cancel.
    // I'm not really sure why you would *want* to do this, though.
    List<State> connections;
    lock (_mutex) { connections = _connections.ToList(); }
    await Task.WhenAll(connections.Select(x => x.Task));
  }
}

那么扩展 State 对象以便执行服务器应用程序的一些有用操作就很容易了,例如:

  • 列出此服务器连接到的所有远程地址。
  • 等待特定连接完成。
  • ...

注:

  • 使用一种取消模式。传递令牌将导致抛出 OperationCanceledException,这是正常的取消模式。代码曾经还进行了 while (!IsCancellationRequested) 操作,这会在取消时成功完成,这不是正常的取消模式。因此,我将其删除,所以代码不再使用两个取消模式。
  • 当使用原始套接字时,在一般情况下,您需要不断地读取(即使在写入时)和定期写入(即使没有数据要发送)。因此,您的 HandleConnectionAsync 应该启动异步读取器和写入器,然后使用 Task.WhenAll
  • 我删除了对 HandleException 的调用,因为(可能)它所做的任何事情都应该由 State.ExecuteAsync 处理。如果需要,很容易将其添加回来。

感谢您的详细回答。只是有一个小问题:将“从套接字读取”公开为C# 8异步可枚举类型是否是一个非常糟糕的想法?我开发了一个API,其中连接对象可以返回IAsyncEnumerable<TMessage>,其中TMessage是从底层的NetworkStream重新组装而成的,因此,这个API的使用者将通过使用await foreach枚举隐式地读取它,我不确定这是否是一个好主意,以及如何使用“异步可枚举”满足“不断读取”的规则。在代码中看起来很不错,因为它感觉就像枚举列表一样。 - user6440521
1
好的,不用管“不断读取”的问题了。我刚意识到这和“正常”的读写情况是一样的——我必须创建单独的任务来枚举和发送。 - user6440521

1
如果允许的并发任务数量有限制,您应该使用SemaphoreSlim:
int allowedConcurrent = //..
var semaphore = new SemaphoreSlim(allowedConcurrent);
var tasks = new List<Task>();

while (!cancellationToken.IsCancellationRequested)
{
    Func<Task> func = async () =>
    {
        var connection = await listener.AcceptAsync(cancellationToken);
        await HandleConnectionAsync(connection, cancellationToken);

        semaphore.Release();
    };

    await semaphore.WaitAsync(); // Will return immediately if the number of concurrent tasks does not exceed allowed

    tasks.Add(func());
}

await Task.WhenAll(tasks);

这将把任务累积到一个列表中,然后使用Task.WhenAll等待它们全部完成。

这是一个不错的解决方案,但如果每次调用 AcceptAsync 都会分配系统资源,并且我想尽可能避免这种情况怎么办?最好假设这是我不希望被并发/同时调用的方法 - 我只能一次接受一个连接。 - user6440521
@BART 所以你想要在多次调用 HandleConnectionAsync 时重复使用连接?你可以在 while 循环之前始终执行 var connection = await listener.AcceptAsync - Johnathan Barclay
“while循环”是服务器生命周期。当新连接被接受时,它会启动一些“后台任务”,以便处理这个新连接,然后“while循环”会回到接受新连接的状态,而不需要等待任何东西-暂停生命周期。使用您的方法,我必须调用多个“AcceptAsync”-最多达到“allowedConcurrent”,即使当前没有客户端连接到服务器,也会消耗系统资源。它也不允许我在完成旧连接后接受任何新连接-“fire and forget”没有这个问题。 - user6440521
@BART,我不完全确定你想表达什么意思。每个“connection”在离开作用域时都会被垃圾回收,就像使用fire and forget一样。 - Johnathan Barclay
我认为关键是我必须改进我的问题。AcceptAsync已经内置了信号量,不允许我接受超过允许的连接数 - 问题不在于限流。这基本上将上面的代码简化为在服务器循环之外保留任务列表并在服务器关闭时等待它们。在这种情况下,任务列表将无限增长,因为我每天可以有数百万个客户端连接和断开与我的服务器 - 服务器可以工作多天。此外,AcceptAsync在返回连接之前分配系统资源。 - user6440521
这只涉及到AcceptAsync - 它可能会等待数小时才返回连接(代码正在等待数小时),直到某个客户端决定连接到我的服务器,但启动/调用AcceptAsync任务会分配资源 - 这就是为什么我不想在不需要时“等待/启动”它 - 一次只需要一个'AcceptAsync'就足够了。 - user6440521

0

首先:

  1. 不要使用异步 void...

然后,您可以为此实现生产者/消费者模式,下面的伪代码仅供参考,您需要确保您的消费者是应用程序中的单例。

public class Data
{
    public Uri Url { get; set; }
}

public class Producer
{
    private Consumer _consumer = new Consumer();
    public void DoStuff()
    {
        var data = new Data();
        _consumer.Enqueue(data);
    }
}

public class Consumer
{
    private readonly List<Data> _toDo = new List<Data>();
    private bool _stop = false;
    public Consumer()
    {
        Task.Factory.StartNew(Loop);
    }

    private async Task Loop()
    {
        while (!_stop)
        {
            Data toDo = null;
            lock (_toDo)
            {
                if (_toDo.Any())
                {
                    toDo = _toDo.First();
                    _toDo.RemoveAt(0);
                }
            }

            if (toDo != null)
            {
                await DoSomething(toDo);
            }

            Thread.Sleep(TimeSpan.FromSeconds(1));
        }
    }

    private async Task DoSomething(Data toDo)
    {
        // YOUR ASYNC STUFF HERE
    }

    public void Enqueue(Data data)
    {
        lock (_toDo)
        {
            _toDo.Add(data);
        }
    }
}

所以你的调用方法生成了你需要执行的后台任务,而消费者执行该任务,这是另一个“fire and forget”(即发出请求后不等待响应直接返回)。 你还应该考虑如果应用程序出现问题会发生什么,是否应该将数据存储在Consumer.Enqueue()中,以便应用程序重新启动时可以完成缺失的工作... 希望这能帮到你。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接