我正在使用响应式扩展与async/await相结合,以简化我的套接字协议实现。 当特定消息到达时必须执行某些操作(例如向每个“ping”发送“pong”),还有一些方法需要异步等待某些特定响应。 以下示例说明了这一点:
private Subject<string> MessageReceived = new Subject<string>();
//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
MessageReceived.OnNext(message);
ProcessMessage(message);
}
public async Task<string> TestMethod()
{
var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
await SendMessage("ABC");
//some code...
//if response we are waiting for comes before next row, we miss it
return await expectedMessage;
}
TestMethod()会将"ABC"发送到套接字,并在接收到例如"DEF"之后继续(在此之前可能会有其他一些消息)。
这个方法几乎可以工作,但存在竞态条件。似乎这段代码只有在return await expectedMessage;
之后才会监听消息。这是一个问题,因为有时消息会在此之前到达。
expectedMessage
时,pong 没有被发送?如果是这样,那么你应该在OnReceiveMessage
中处理ping
消息,而不是使用MessageReceived.OnNext()
存储它。 - Jeroen van Langen