UdpClient.ReceiveAsync 正确的早期终止

3

你好。我使用UdpClient和其封装相关工作。

读取方面,我有一个异步方法:

private async Task<byte[]> Receive(UdpClient client, CancellationToken breakToken)
{
    // Выход из async, если произошёл CancellationRequest
    breakToken.ThrowIfCancellationRequested();

    UdpReceiveResult result;
    try
    {
        result = await client.ReceiveAsync().WithCancellation(breakToken);
    }
    catch(OperationCanceledException)
    {
        // Штатная ситуация ручной остановки Task-а
    }

    return result.Buffer;
}

这里的WithCancellation是我的扩展方法,用于提前终止操作:

public static async Task<T> WithCancellation<T>(
    this Task<T> task, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<bool>();

    using (cancellationToken.Register(
        s => ((TaskCompletionSource<bool>)s).TrySetResult(true),
        tcs))
        if (task != await Task.WhenAny(task, tcs.Task))
            throw new OperationCanceledException(cancellationToken);

    return await task;
}

在手动阅读停止后,当我调用Dispose时,会出现System.ObjectDisposedException异常。 CallStack

>   System.dll!System.Net.Sockets.UdpClient.EndReceive(System.IAsyncResult asyncResult, ref System.Net.IPEndPoint remoteEP) Unknown
System.dll!System.Net.Sockets.UdpClient.ReceiveAsync.AnonymousMethod__64_1(System.IAsyncResult ar)  Unknown
mscorlib.dll!System.Threading.Tasks.TaskFactory<System.Net.Sockets.UdpReceiveResult>.FromAsyncCoreLogic(System.IAsyncResult iar, System.Func<System.IAsyncResult, System.Net.Sockets.UdpReceiveResult> endFunction, System.Action<System.IAsyncResult> endAction, System.Threading.Tasks.Task<System.Net.Sockets.UdpReceiveResult> promise, bool requiresSynchronization)   Unknown
mscorlib.dll!System.Threading.Tasks.TaskFactory<System.Net.Sockets.UdpReceiveResult>.FromAsyncImpl.AnonymousMethod__0(System.IAsyncResult iar)  Unknown
System.dll!System.Net.LazyAsyncResult.Complete(System.IntPtr userToken) Unknown
System.dll!System.Net.ContextAwareResult.CompleteCallback(object state) Unknown
mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)   Unknown
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx)   Unknown
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) Unknown
System.dll!System.Net.ContextAwareResult.Complete(System.IntPtr userToken)  Unknown
System.dll!System.Net.LazyAsyncResult.ProtectedInvokeCallback(object result, System.IntPtr userToken)   Unknown
System.dll!System.Net.Sockets.BaseOverlappedAsyncResult.CompletionPortCallback(uint errorCode, uint numBytes, System.Threading.NativeOverlapped* nativeOverlapped)  Unknown
mscorlib.dll!System.Threading._IOCompletionCallback.PerformIOCompletionCallback(uint errorCode, uint numBytes, System.Threading.NativeOverlapped* pOVERLAP) Unknown

如果我理解正确的话,错误的根源在于ReceiveAsync,确切地说是在我的停止方法中。但我不知道如何修复它。
我该怎么做才能纠正这个错误?

在用户评论后更新:

private async Task<byte[]> Receive(UdpClient client, CancellationToken breakToken)
{
    // Выход из async, если произошёл CancellationRequest
    breakToken.ThrowIfCancellationRequested();

    UdpReceiveResult result;
    try
    {
        result = await client.ReceiveAsync().WithCancellation(breakToken);
    }
    catch(OperationCanceledException)
    {
        // Штатная ситуация ручной остановки Task-а
    }
    catch(ObjectDisposedException) { }

    return result.Buffer;
}

以及Dispose的调用:

public void Dispose()
{
    this.cancelRecieve?.Cancel();
    this.cancelRecieve?.Dispose();

    try
    {
        this.client?.Close();
    }
    catch(ObjectDisposedException) { }
}

但是catch不会对ObjectDisposedException做出反应。

2个回答

5

经过将近一周的痛苦,我终于找到了原因和解决方法。

起初,我查看了UdpClient源代码。其中的ReceiveAsync方法:

[HostProtection(ExternalThreading = true)]
public Task<UdpReceiveResult> ReceiveAsync()
{
    return Task<UdpReceiveResult>.Factory.FromAsync((callback, state) => BeginReceive(callback, state), (ar)=>
        {
            IPEndPoint remoteEP = null;
            Byte[] buffer = EndReceive(ar, ref remoteEP);
            return new UdpReceiveResult(buffer, remoteEP);

        }, null);
}

其次,我发现这篇帖子有一个完美的答案:如何中止套接字的BeginReceive()方法?,其中提到:

要取消对BeginConnect()方法的挂起调用,请关闭Socket。当在异步操作正在进行时调用Close()方法时,将调用提供给BeginConnect()方法的回调函数。对EndConnect(IAsyncResult)方法的后续调用将抛出ObjectDisposedException以指示该操作已被取消。

我们可以看到,原始的ReceiveAsync方法返回了ObjectDisposedException,因为在调用Close之后,IOOperation尚未完成。

为了解决这个问题,我做了以下操作:

新的ReceiveAsync实现如下:

/// <summary>
/// Асинхронный запрос на ожидание приёма данных с возможностью досрочного выхода
/// (для выхода из ожидания вызовите метод Disconnect())
/// </summary>
/// <param name="client">Рабочий экземпляр класса UdpClient</param>
/// <param name="breakToken">Признак досрочного завершения</param>
/// <returns>Если breakToken произошёл до вызова данного метода или в режиме ожидания
/// ответа, вернёт пустой UdpReceiveResult; при удачном получении ответа-результат
/// асинхронной операции чтения</returns>
public Task<UdpReceiveResult> ReceiveAsync(UdpClient client, CancellationToken breakToken)
    => breakToken.IsCancellationRequested
        ? Task<UdpReceiveResult>.Run(() => new UdpReceiveResult())
        : Task<UdpReceiveResult>.Factory.FromAsync(
            (callback, state) => client.BeginReceive(callback, state),
            (ar) =>
                {
                    /// Предотвращение <exception cref="ObjectDisposedException"/>
                    if (breakToken.IsCancellationRequested)
                        return new UdpReceiveResult();

                    IPEndPoint remoteEP = null;
                    var buffer = client.EndReceive(ar, ref remoteEP);
                    return new UdpReceiveResult(buffer, remoteEP);
                },
            null);

新的Dispose实现:

protected virtual void Dispose(bool disposing)
{
    if (disposing)
    {
        this.cancelReceive?.Cancel();
        this.client?.Close();
        this.cancelReceive?.Dispose();
    }
}

我非常希望,我的决定能够避免其他人经历我所经历的痛苦。


1
这个代码没有在所有情况下调用所需的EndReceive。如果你让它调用EndReceive,你又会收到ObjectDisposedException。但是我已经解释了如何处理这个异常。你原来的代码很好,除了这个错误的异常。 - usr
1
文档说明必须调用它。不调用它是使用错误。实际上,它可能会泄漏内存。捕获异常以处理它是完全可以的。在这里什么也不做是适当的。 - usr
1
这并不是有保障的。而且它似乎没有显示没有泄漏。此外,没有理由做任何这些事情。新的ReceiveAsync代码更加复杂。你只需要catch(ODE)就可以了。 - usr
1
框架本身是正确的,根据文档您正在错误地使用它。 - usr
2
关闭是正确的,但您必须始终调用End方法。答案是错误的,原因与我已经解释的相同:不能保证这样做。答案依赖于反编译当前代码。可能随时会出现问题。文档说明“必须通过调用EndReceive方法来完成异步BeginReceive操作”。 - usr
显示剩余5条评论

0
唯一取消挂起接收的方法是像您所做的那样断开/停止/处理连接。这是正确的。您需要捕获并忽略该异常。
这是.NET Framework的一个不幸的设计问题,这是唯一的方法。
请注意,WithCancellation无法取消IO。 接收仍在运行。这就是为什么WithCancellation必须跟随套接字的处理以确保没有其他挂起的IO。

但是如果我只调用 Dispose(删除 Receive 中的 .WithCancellation(breakToken)),这个对象将不会被处理。 而且我无法捕获 ObjectDisposedException,我已经尝试过了... - EgoPingvina
不确定您的意思。您所拥有的代码很好(假设超时时也会关闭套接字)。但是您需要吞掉ObjectDisposedException异常。 - usr
尝试 { result = await client.ReceiveAsync().WithCancellation(breakToken); } catch(OperationCanceledException) { // 手动停止任务的正常情况 } catch(ObjectDisposedException) {}和try { this.client?.Close(); } catch(ObjectDisposedException) {}都没有捕获这个异常。 - EgoPingvina
你是否正在发出breakToken信号?如果是这样,接收任务将被丢弃。接收任务是使用ObjeDispEx故障的任务。 - usr
我刚告诉你为什么:在那种情况下,Receive任务会被丢弃。Receive任务是使用ObjeDispEx失败的任务。那怎么样? - usr
显示剩余8条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接