如何使用Disruptor处理多种消息类型

19
我的系统有两种不同类型的消息 - 类型 A 和 B。每个消息具有不同的结构 - 类型 A 包含一个 int 成员,而类型 B 包含一个 double 成员。我的系统需要将这两种类型的消息传递给多个业务逻辑线程。减少延迟非常重要,因此我正在研究使用 Disruptor 以机械方式传递消息从主线程到业务逻辑线程。
问题在于 disruptor 只接受环形缓冲区中一种类型的对象。这很合理,因为 disruptor 预先分配了环形缓冲区中的对象。然而,这也使得通过 disruptor 向业务逻辑线程传递两种不同类型的消息变得困难。据我所知,我有四个选择:
1. 配置 disruptor 使用包含 固定大小的字节数组 的对象(正如 How should one use Disruptor (Disruptor Pattern) to build real-world message systems? 推荐的那样)。在这种情况下,主线程必须将消息编码为字节数组,然后将其发布到 disruptor 中,每个业务逻辑线程在收到消息后都必须将字节数组解码回对象。这种设置的缺点是业务逻辑线程不是真正共享 disruptor 的内存 - 而是从 disruptor 提供的字节数组创建新对象(从而创建垃圾)。这种设置的优点是所有业务逻辑线程都可以从同一个 disruptor 中读取多种不同类型的消息。
2. 配置 disruptor 使用单个类型的对象,但创建 多个 disruptors,每种对象类型都有一个 disruptor。在上面的例子中,将会有两个独立的 disruptor - 一个用于类型 A 的对象,另一个用于类型 B 的对象。这种设置的优点是主线程无需将对象编码为字节数组,业务逻辑线程少创建垃圾,因为它们可以共享 disruptor 中使用的相同对象。这种设置的缺点是每个业务逻辑线程都要订阅来自多个 disruptors 的消息。
3. 仅使用一种类型的消息,并且在消息中包含一个标识符,以告知业务逻辑线程消息的类型。在收到消息后,业务逻辑线程必须根据标识符对消息进行分类处理。这种设置的优点是简单,但它可能会影响性能,因为分类处理可能会导致延迟增加。
4. 重构系统设计,使得只有一种类型的消息存在。如果这种方案可行,那么肯定是最好的选择。
  • 将 disruptor 配置为使用一个包含消息 A 和 B 所有字段的"超级"对象的单个类型。这非常违反面向对象的风格,但会在选项#1和#2之间取得折衷。

  • 将 disruptor 配置为使用对象引用。但在这种情况下,我失去了对象预分配和内存排序带来的性能优势。

  • 你对此情况有什么建议?我觉得选项#2是最简洁的解决方案,但我不知道消费者是否可以技术上订阅来自多个 disruptor 的消息。如果有人能提供关于如何实现选项#2的示例,那将不胜感激!


    6
    Michael Barker在Disruptor Google Group中回答了我的问题。请查看他的回复:https://groups.google.com/d/msg/lmax-disruptor/clUkJaFMsZg/54fKplz21MwJ - Ben Baumgold
    4
    如果这回答了你的问题,请考虑将其转化为一个答案并标记为已接受。 - cic
    我已经成功地使用了方法(2),使用链接的Disruptors,其中第一个处理程序将数据传递给第二个。 - AlexO
    3个回答

    3
    配置 disruptor 以使用包含固定大小字节数组的对象(如 How should one use Disruptor (Disruptor Pattern) to build real-world message systems? 推荐)。在这种情况下,主线程必须将消息编码为字节数组,然后发布到 disruptor 中,每个业务逻辑线程都必须在接收时将字节数组解码回对象。这种设置的缺点是业务逻辑线程并没有真正共享 disruptor 的内存 - 而是从 disruptor 提供的字节数组创建新对象(从而创建垃圾)。这种设置的优点是所有业务逻辑线程都可以从同一个 disruptor 中读取多种不同类型的消息。
    这是我首选的方法,但我们的用例略有不同,我们使用Disruptor的每个地方都会从某种I/O设备接收或发送,所以我们的基本货币是字节数组。您可以通过使用飞行权重方法来避免对象创建并进行编组。要查看此示例,请参阅我在Devoxx上展示的示例中使用的Javolution的Struct和Union类(https://github.com/mikeb01/ticketing)。如果您能在从事件处理程序的onEvent调用返回对象之前完全处理该对象,则此方法效果很好。如果事件需要超出该点存在,则需要对数据进行某种副本,例如将其反序列化为对象。
    配置 disruptor 使用单个对象类型,但创建多个 disruptor,每种对象类型一个。在上面的情况下,将有两个单独的 disruptor - 一个用于类型 A 的对象,另一个用于类型 B 的对象。这种设置的好处是主线程不必将对象编码为字节数组,并且业务逻辑线程可以共享与 disruptor 中使用的相同对象(不会创建垃圾)。这种设置的缺点是每个业务逻辑线程都必须以某种方式订阅来自多个 disruptor 的消息。
    尚未尝试此方法,您可能需要自定义 EventProcessor,以便从多个环形缓冲区轮询。
    配置 disruptor 使用包含消息 A 和 B 所有字段的单个“超级”对象。这非常违反 OO 风格,但可以在选项 #1 和 #2 之间做出妥协。配置 disruptor 使用对象引用。但是,在这种情况下,我失去了对象预分配和内存排序的性能优势。
    我们在一些情况下做过这件事,在某些情况下,缺乏预分配是可以容忍的。它运行得很好。如果您正在传递对象,则需要确保在使用完毕后将其清空。我们发现对于“超级”对象使用双重调度模式可以使实现相当干净。其中一个缺点是,与直接对象数组相比,您将在标记阶段期间遍历更多的活动对象,从而导致稍微长一些的GC停顿时间。

    针对此情况,您有什么建议?我觉得选项#2是最清洁的解决方案,但我不知道消费者是否可以在技术上订阅来自多个disruptor的消息。如果有人可以提供有关如何实现选项#2的示例,那将不胜感激!

    如果您想要完全灵活地使用数据,则可以不使用环形缓冲区,而是直接与Sequencer交互,并根据您认为最合适的方式定义对象布局。

    2
    我相信你现在已经找到了解决方案,Ben Baumgold。你的第四个问题(或第三个问题)可以通过创建事件持有者来轻松实现。将其视为对象的枚举。为了加快查找速度,应该使用枚举类型来丰富事件。请注意,我在持有者中存储对原始事件的引用。在插入环形缓冲区时,创建副本构造函数或克隆()并复制事件可能更合适。
    例如说明:
    //这是事件中使用的枚举
    public enum MyEventEnum {
    EVENT_TIMER,
    EVENT_MARKETDATA;
    }
    

    // 这是一个占位符。在任何时候,环形缓冲区中的此实例仅存储由array[type.ordinal()]索引的一个事件。为什么要使用数组在代码中应该很明显。

    public class RingBufferEventHolder {    
     private MyEventEnum;   
     private EventBase array[];
    
     public RingBufferEventHolder() {
        array=new EventBase[MyEventEnum.values().length]; 
     }
    
     // TODO: null the rest
     public void setEvent(EventBase event) {
        type=event.getType();
        switch( event.getType() ) {
            case EVENT_TIMER:
                array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
                break;
            case EVENT_MARKETDATA:
                array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
                break;
            default:
                throw new RuntimeException("Unknown event type " + event );
        }
    }
    

    // 发布事件

       EventBase newEvent=new EventMarketData(....);
       // prepare
       long nextSequence = ringBuffer.next(); 
       RingBufferEventHolder holder = ringBuffer.get(nextSequence);
       holder.setEvent(newEvent);
       // make the event available to EventProcessors 
       ringBuffer.publish(nextSequence);
    

    与我关于同一问题的实现太接近了。主要区别在于使用了数组。 我的事件类看起来像这样; EventBase{ private EventTypeEnum type; // [NEW_ORDER, CANCEL_ORDER, L2MarketData, MarketUpdates] private OrderEvent orderEvent; private MarketDataEvent marketEvent; .... } - cengizkrbck

    0

    与Vortex的答案太相似,但在保留子事件方面有所区别。 它是#3和#4的混合体。如果我能管理业务逻辑复杂性,我会选择#2多个distruptors。

    首要关注的是相对于基于数组的枚举事件类型实现的优势是不同事件类型的共享对象类型。

    public enum ExchangeEventType{
        PLACE_ORDER,   // -> OrderEvent
        CANCEL_ORDER,  // -> OrderEvent
        MARKET_FEED,   // -> MarketEvent
        MARKET_UPDATE, // -> MarketEvent
        ADD_USER,      // -> AccountEvent
        SUSPEND_USER,  // -> AccountEvent
        RESUME_USER    // -> AccountEvent
    }    
    
    public ExchangeEvent{
      private EventType type;
      private EventResultCode resultCode;
      private long timestamp;
    
      // event type objects
      private OrderEvent orderEvent;
      private MarketEvent marketEvent;
      private AccountEvent accountEvent;
    }
    

    在业务逻辑中,多个处理器消耗和产生多种类型的事件,因此我有意选择不使用单独的分离器来进行权衡。
    例如:
    • #1 引擎使用 OrderEventAccountEvent
    • #2 引擎使用 MarketEventOrderEvent

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接