使用async/await、RX和LINQ进行异步消息处理

3

我正在使用响应式扩展与async/await相结合,以简化我的套接字协议实现。 当特定消息到达时必须执行某些操作(例如向每个“ping”发送“pong”),还有一些方法需要异步等待某些特定响应。 以下示例说明了这一点:

private Subject<string> MessageReceived = new Subject<string>();

//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
    MessageReceived.OnNext(message);
    ProcessMessage(message);
}

public async Task<string> TestMethod()
{
    var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
    await SendMessage("ABC");

    //some code...

    //if response we are waiting for comes before next row, we miss it
    return await expectedMessage;
}

TestMethod()会将"ABC"发送到套接字,并在接收到例如"DEF"之后继续(在此之前可能会有其他一些消息)。

这个方法几乎可以工作,但存在竞态条件。似乎这段代码只有在return await expectedMessage;之后才会监听消息。这是一个问题,因为有时消息会在此之前到达。


你的意思是当等待 expectedMessage 时,pong 没有被发送?如果是这样,那么你应该在 OnReceiveMessage 中处理 ping 消息,而不是使用 MessageReceived.OnNext() 存储它。 - Jeroen van Langen
实际上,我在示例中省略了那部分。它正常工作。在ProcessMessages()方法中,我会对ping消息发送pong作为响应,但问题是在异步方法中等待特定事件(从套接字接收到的消息)。 - Juha
1个回答

5

FirstOrDefaultAsync 在这里不适用:它直到await行才订阅,这会留下竞态条件(正如你指出的那样)。以下是您可以替换它的方法:

    var expectedMessage = MessageReceived
        .Where(x => x.EndsWith("D") && x.EndsWith("F"))
        .Take(1)
        .Replay(1)
        .RefCount();

    using (var dummySubscription = expectedMessage.Subscribe(i => {}))
    {
        await SendMessage("ABC");

        //Some code... goes here.

        return await expectedMessage;
    }

.Replay(1) 确保新的订阅获取最近的条目,假设存在一个。但是,只有在有订阅者监听时才起作用,因此需要 dummySubscription


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接