目前,单个回测能够很好地发送N个单元测试,每个测试都有数千次请求和捕获。我的问题在于架构;当我分派另一个回测(跟随第一个)时,由于轮询线程未被取消和处理,事件订阅会再次进行,导致输出错误。这可能看起来像是微不足道的问题(也许对你们中的一些人来说是这样),但在当前配置下取消此轮询任务证明是麻烦的。一些代码...
我的消息代理类很简单,如下所示:
public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
private Task pollingTask;
private NetMQContext context;
private PublisherSocket pubSocket;
private CancellationTokenSource source;
private CancellationToken token;
private ManualResetEvent pollerCancelled;
public MessageBroker()
{
this.source = new CancellationTokenSource();
this.token = source.Token;
StartPolling();
context = NetMQContext.Create();
pubSocket = context.CreatePublisherSocket();
pubSocket.Connect(PublisherAddress);
}
public void Dispatch(Taurus.FeedMux message)
{
pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
}
private void StartPolling()
{
pollerCancelled = new ManualResetEvent(false);
pollingTask = Task.Run(() =>
{
try
{
using (var context = NetMQContext.Create())
using (var subSocket = context.CreateSubscriberSocket())
{
byte[] buffer = null;
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect(SubscriberAddress);
subSocket.Subscribe(String.Empty);
while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}
}
}
catch (OperationCanceledException)
{
pollerCancelled.Set();
}
}, this.token);
}
private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne();
pollerCancelled.Close();
}
public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }
private bool disposed = false;
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (this.pollingTask != null)
{
CancelPolling();
if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
this.pollingTask.Status == TaskStatus.Faulted ||
this.pollingTask.Status == TaskStatus.Canceled)
{
this.pollingTask.Dispose();
this.pollingTask = null;
}
}
if (this.context != null)
{
this.context.Dispose();
this.context = null;
}
if (this.pubSocket != null)
{
this.pubSocket.Dispose();
this.pubSocket = null;
}
if (this.source != null)
{
this.source.Dispose();
this.source = null;
}
}
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~MessageBroker()
{
Dispose(false);
}
}
回测“引擎”用于执行每个回测,首先构建一个包含每个“测试”(单元测试)和要分派到 C++ 应用程序的消息的字典。
这里是 DispatchTests
方法:
private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
broker = new MessageBroker();
broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
testCompleted = new ManualResetEvent(false);
try
{
// Loop through the tests.
foreach (var kvp in feedMuxCollection)
{
testCompleted.Reset();
Test t = kvp.Key;
t.Bets = new List<Taurus.Bet>();
foreach (Taurus.FeedMux mux in kvp.Value)
{
token.ThrowIfCancellationRequested();
broker.Dispatch(mux);
}
broker.Dispatch(new Taurus.FeedMux()
{
type = Taurus.FeedMux.Type.PING,
ping = new Taurus.Ping() { event_id = t.EventID }
});
testCompleted.WaitOne(); // Wait until all messages are received for this test.
}
testCompleted.Close();
}
finally
{
broker.Dispose(); // Dispose the broker.
}
}
在结尾处的
PING
信息是告诉C++我们已经完成了。然后我们强制等待,以便在从C++代码接收到所有返回之前不会分派下一个[unit]测试 - 我们使用ManualResetEvent
来实现这一点。当C++接收到PING消息时,它会立即将该消息发送回来。我们通过
OnMessageRecieved
处理接收到的消息,并且PING告诉我们设置ManualResetEvent.Set()
,以便我们可以继续进行单元测试;“接下来请”...private async void OnMessageRecieved(Taurus.FeedMux mux)
{
string errorMsg = String.Empty;
if (mux.type == Taurus.FeedMux.Type.MSG)
{
// Do stuff.
}
else if (mux.type == Taurus.FeedMux.Type.PING)
{
// Do stuff.
// We are finished reciving messages for this "unit test"
testCompleted.Set();
}
}
我的问题是,上面的finally中的
broker.Dispose()
从未被执行。我知道在后台线程上执行的finally块不能保证被执行。上面划掉的文字是因为我搞乱了代码;我停止了父线程,而子线程还没有完成。然而,仍然存在问题...
现在
broker.Dispose()
被正确调用,并且在这个方法中,我尝试取消轮询线程并正确地处理Task
,以避免多次订阅。为了取消线程,我使用
CancelPolling()
方法。private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
pollerCancelled.Close();
}
但是在StartPolling()
方法中
while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}
ThrowIfCancellationRequested()
从未被调用,线程也从未被取消,因此无法正确处理。轮询线程被subSocket.Receive()
方法阻塞。
现在,我不清楚如何实现我的目标,我需要在除了用于轮询消息的线程之外的线程上调用broker.Dispose()
/PollerCancel()
并强制取消。无论如何,我都不想以任何代价使用线程中止。
基本上,在执行下一个回测之前,我希望正确处理broker
的处理,我该如何正确处理这个问题?将轮询拆分并在单独的应用程序域中运行可以吗?
我已经尝试在OnMessageRecived
处理程序中进行处理,但这显然在与轮询相同的线程上执行,并且不是正确的处理方式,没有调用其他线程,它会被阻塞。
最佳实践是什么,有没有相关的模式可供参考?
感谢您的时间。
broker.Dispose()
从未被执行。但是为什么呢?你有进行调试并查看原因吗? - Sriram Sakthivel