使用多线程轮询取消的ZeroMQ PUB/SUB模式

18
我有两个应用程序,一个是C++服务器,另一个是C# WPF UI。C++代码通过ZeroMQ消息[PUB/SUB]服务接收来自任何地方/任何人的请求。我使用我的C#代码进行回测并创建“回测”,然后执行它们。这些回测可以由许多“单元测试”组成,每个测试从C++服务器发送/接收数千条消息。
目前,单个回测能够很好地发送N个单元测试,每个测试都有数千次请求和捕获。我的问题在于架构;当我分派另一个回测(跟随第一个)时,由于轮询线程未被取消和处理,事件订阅会再次进行,导致输出错误。这可能看起来像是微不足道的问题(也许对你们中的一些人来说是这样),但在当前配置下取消此轮询任务证明是麻烦的。一些代码...
我的消息代理类很简单,如下所示:
public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

回测“引擎”用于执行每个回测,首先构建一个包含每个“测试”(单元测试)和要分派到 C++ 应用程序的消息的字典。

这里是 DispatchTests 方法:

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

在结尾处的PING信息是告诉C++我们已经完成了。然后我们强制等待,以便在从C++代码接收到所有返回之前不会分派下一个[unit]测试 - 我们使用ManualResetEvent来实现这一点。
当C++接收到PING消息时,它会立即将该消息发送回来。我们通过OnMessageRecieved处理接收到的消息,并且PING告诉我们设置ManualResetEvent.Set(),以便我们可以继续进行单元测试;“接下来请”...
private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

我的问题是,上面的finally中的broker.Dispose()从未被执行。我知道在后台线程上执行的finally块不能保证被执行。
上面划掉的文字是因为我搞乱了代码;我停止了父线程,而子线程还没有完成。然而,仍然存在问题...
现在broker.Dispose()被正确调用,并且在这个方法中,我尝试取消轮询线程并正确地处理Task,以避免多次订阅。
为了取消线程,我使用CancelPolling()方法。
private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

但是在StartPolling()方法中

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}
ThrowIfCancellationRequested()从未被调用,线程也从未被取消,因此无法正确处理。轮询线程被subSocket.Receive()方法阻塞。

现在,我不清楚如何实现我的目标,我需要在除了用于轮询消息的线程之外的线程上调用broker.Dispose()/PollerCancel()并强制取消。无论如何,我都不想以任何代价使用线程中止。

基本上,在执行下一个回测之前,我希望正确处理broker的处理,我该如何正确处理这个问题?将轮询拆分并在单独的应用程序域中运行可以吗?

我已经尝试在OnMessageRecived处理程序中进行处理,但这显然在与轮询相同的线程上执行,并且不是正确的处理方式,没有调用其他线程,它会被阻塞。

最佳实践是什么有没有相关的模式可供参考?

感谢您的时间。


我不明白问题出在哪里。你说 broker.Dispose() 从未被执行。但是为什么呢?你有进行调试并查看原因吗? - Sriram Sakthivel
2个回答

2
这就是我最终解决这个问题的方法(虽然我愿意采用更好的解决方案!)
public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    // Vars.
    private NetMQContext context;
    private PublisherSocket pubSocket;
    private Poller poller;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    /// <summary>
    /// Default ctor.
    /// </summary>
    public FeedMuxMessageBroker()
    {
        context = NetMQContext.Create();

        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);

        pollerCancelled = new ManualResetEvent(false);
        source = new CancellationTokenSource();
        token = source.Token;
        StartPolling();
    }

    #region Methods.
    /// <summary>
    /// Send the mux message to listners.
    /// </summary>
    /// <param name="message">The message to dispatch.</param>
    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    /// <summary>
    /// Start polling for messages.
    /// </summary>
    private void StartPolling()
    {
        Task.Run(() =>
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    subSocket.ReceiveReady += (s, a) =>
                    {
                        buffer = subSocket.Receive();
                        if (MessageRecieved != null)
                            MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                    };

                    // Poll.
                    poller = new Poller();
                    poller.AddSocket(subSocket);
                    poller.PollTillCancelled();
                    token.ThrowIfCancellationRequested();
                }
            }, token).ContinueWith(ant => 
                {
                    pollerCancelled.Set();
                }, TaskContinuationOptions.OnlyOnCanceled);
    }

    /// <summary>
    /// Cancel polling to allow the broker to be disposed.
    /// </summary>
    private void CancelPolling()
    {
        source.Cancel();
        poller.Cancel();

        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }
    #endregion // Methods.

    #region Properties.
    /// <summary>
    /// Event that is raised when a message is recived. 
    /// </summary>
    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

    /// <summary>
    /// The address to use for the publisher socket.
    /// </summary>
    public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

    /// <summary>
    /// The address to use for the subscriber socket.
    /// </summary>
    public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
    #endregion // Properties.

    #region IDisposable Members.
    private bool disposed = false;

    /// <summary>
    /// Dispose managed resources.
    /// </summary>
    /// <param name="disposing">Is desposing.</param>
    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                CancelPolling();
                if (pubSocket != null)
                {
                    pubSocket.Disconnect(PublisherAddress);
                    pubSocket.Dispose();
                    pubSocket = null;
                }
                if (poller != null)
                {
                    poller.Dispose();
                    poller = null;
                }
                if (context != null)
                {
                    context.Terminate();
                    context.Dispose();
                    context = null;
                }
                if (source != null)
                {
                    source.Dispose();
                    source = null;
                }
            }

            // Shared cleanup logic.
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Finalizer.
    /// </summary>
    ~FeedMuxMessageBroker()
    {
        Dispose(false);
    }
    #endregion // IDisposable Members.
}

我们使用来自NetMQ的Poller类以相同的方式进行轮询。在任务继续中,我们设置了这样的内容,以确保PollerTask都被取消。然后我们就可以放心地处理...


你的解决方案看起来不错。你也可以使用套接字选项DontWait(在Receive上也有一个bool dontWait重载),或者在套接字上设置自定义的ReceiveTimeout来执行自己的轮询循环。你可能会发现使用NetMQMonitor获取各种状态更改事件(如连接和断开连接)很有用。顺便提一下,为了线程安全,在调用MessageReceived事件之前,你应该将其复制到一个临时变量中。除非你正在清理一些在你的示例中看不到的未托管资源,否则你应该删除终结器。 - Zer0
在调用MessageReceived事件时,为了线程安全,你应该将其复制到一个临时变量中。但是,事件处理程序将被调用在与调用线程相同的线程上,不是吗?此外,这也是一个IProgress<Taurus.FeedMux>对象,这会有任何影响吗?关于finally块,你指的是什么?上面的解决方案中没有finally-我强制使用Dispose方法来取消和释放线程/Task,因此继续通知ManualResetEvent可以继续。非常感谢您的时间,如果有更多的见解,请告诉我。 - MoonKnight
我误解了 MessageReceived. 我以为它是一个事件并且被注释为此。但是问题基本上在 null 检查之后,调用另一个线程可能将其设置为null。所以调用事件的常见模式是首先将它们复制到一个临时变量中。即使不是这种情况,养成这种好习惯也是很有帮助的。我的意思是 _finalizer_,即 FeedMuxMessageBroker 方法中的 ~Finalizers 用于清理非托管资源,但您没有任何非托管资源需要清理。 - Zer0

1

主题的高级视角

你专注于创建测试框架的努力表明,你的意愿是开发一种严谨和专业级别的方法,这使我首先向这样勇敢的尝试致以敬意。

虽然测试是提供合理定量证据的重要活动,证明被测试系统符合定义的期望,但成功取决于测试环境与真实部署条件的接近程度。

可以认为,在不同基础上进行测试并不能证明真实部署将在根本不同于已测试环境的环境中按预期运行。


逐元素控制还是状态控制,这是个问题。

您的努力(至少在发布原帖时)集中于代码架构,试图保持实例就地并尝试在下一个测试批次开始之前重新设置Poller实例的内部状态。

在我看来,测试有一些原则需要遵循,如果您追求专业的测试:

  • 测试可重复性原则(测试的重新运行应该提供相同的结果,从而避免提供仅提供结果“彩票”的准测试)

  • 非干预性测试原则(测试的重新运行不应受到“外部”干扰,不受测试方案控制)

说了这么多,让我引用一些哈里·马科维茨的笔记,他因其杰出的量化投资组合优化研究而获得诺贝尔奖。

最好退一步,全面掌控元素的生命周期

CACI模拟公司(Harry Markowitz的公司之一)在90年代初开发了他们的旗舰软件框架COMET III - 一个非常强大的模拟引擎,用于大型、复杂的设计原型和性能模拟,涉及大规模计算/网络/电信网络中的流程。COMET III给人留下最深刻的印象是它的能力,可以生成包括可配置的预测试“热身”预加载的测试场景,使得被测试元素进入类似于机械折磨试验中的“疲劳”状态或核电站冶金学家所说的氢扩散脆性状态。是的,一旦你深入到算法、节点缓冲区、内存分配、管道化/负载均衡/网格处理架构选择、容错开销、垃圾收集策略和有限资源共享算法如何工作并影响(在真实使用负载模式“压力”下)端到端性能/延迟的低级细节,这个功能就是不可或缺的。
这意味着,一个单独的实例相关的简单状态控制是不够的,因为它不能提供测试可重复性和测试隔离/非干预行为的手段。简单来说,即使你找到了一种“重置”Poller实例的方法,这也不能让你进行现实测试,并保证可以进行预测试热身。
需要一步回退和更高层次的抽象和测试场景控制。
如何应用于OP问题?
- 不仅仅是状态控制 - 创建多层架构/控制平面/分离信号 - 使用ZeroMQ支持此目标的方式
- 创建超级结构作为非平凡模式 - 使用测试场景内使用的实例的完整生命周期控制 - 保持ZeroMQ-maxims:零共享、零阻塞等 - 受益于Multi-Context()

谢谢您的答复,但我不确定它是否回答了我的问题。 我的测试资源管理器的架构已经不是简单的了,而且想法是尽可能地简化,不要过度抽象和不必要地复杂化。 如果您可以提供“更高层次的抽象”或“超级结构”的示例,将不胜感激。 目前,我不会为每个后向测试“共享” MessageBroker,这就是为什么我想正确处理它的原因。 感谢您的时间。 - MoonKnight
作为“超级结构”的一个例子,可以查看带有消息节点的图片>>>https://dev59.com/PYLba4cB1Zd3GeqPgpc3#25369515。这种复合元素可以在多个不相交的控制平面上实现消息传递策略——一个传输(您的SUT/DUT),下一个“以上”作为测试控制信令,下一个“以上”作为性能控制用于监视/日志记录等。有意将传输Context()实例与其他信令服务的Context()实例分开有助于保持两个关键原则——隔离原则+可重复性原则。 - user3666197
注意:我认为减少极端审查对知识社区更有益 - @halfer - 感谢您坚持不懈地编辑许多帖子以消除语法错误(AI/ML可能会在不久的将来进入形式,以执行此类角色,使您有更多时间进行某些确实创造性的内容,而不仅仅是删除重复的模式 (许多,我承认)无意中的错误)。但是,您为什么还要删除有意的内容? (引自)“...删除关于诺贝尔奖获得者的重复材料” 1)它具有意义2)诺贝尔奖是授予的,而不是赢得的 - user3666197

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接