.NET - 使用.NET响应式编程实现多线程的生产者消费者模型

3

我正在努力使用.NET响应式技术构建生产者..消费者模式。生产者从Kafka消息总线中读取消息。一旦读取到消息,就需要将其移交给消费者来处理该消息。

我已经能够使用.NET响应式技术完成了这个任务。但是,我注意到消费者在与生产者相同的线程上处理消息。请参见下面的代码。目标是:拥有一个单独的生产者线程,从总线中读取消息。然后,在另一个线程上将其移交给消费者以处理消息。我目前的代码如下:

 // Producer Code
 private Subject<LGMessage> _onMessageSubject = new Subject<LGMessage>();

 private IObserver<LGMessage> messageBusObserver;

 protected IObservable<LGMessage> _onMessageObservable
    {
        get
        {
            return _onMessageSubject.AsObservable();
        }
    }


public void AddObserver(IObserver<LGMessage> observer)
    {
       _onMessageObservable.ObserveOn(NewThreadScheduler.Default).Subscribe(observer);


    }


// Read is called when the message is read from the bus
public bool Read(Message<string, string> msg)
    {

            // add the message to the observable
            _onMessageSubject.OnNext(msg.Value);


    }

// Consumer Code
public virtual void OnNext(string value)
    {
        Console.WriteLine("Thread {0} Consuming",          

        Thread.CurrentThread.ManagedThreadId);

        Console.WriteLine("{1}: Message I got from bus: {0}", value.Key, 
         this.Name);
        // Take Action
    }

你似乎正在实现自己的观察者,并且写了些不符合惯用法的代码。你能否为我们发布一个 [mcve] 以便我们查看完整用法? - Enigmativity
如果你认为手写的多线程代码能够提供更好的性能,那么最好先进行基准测试。我建议使用TPL。 - Asti
1个回答

2
从您的代码中很难确定,但看起来您没有暴露可观察对象。这会阻止下游使用 Rx 运算符。在您的情况下,您想要使用 线程运算符
在生产者中,不要暴露 AddObserver(IObserver<string> observer),而应该暴露类似以下内容的东西:
public IObservable<string> Messages => _onMessageSubject.AsObservable();

一个消费者可以做类似于以下的事情
Messages
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(consumerObserver);

编辑:
以下代码按照我的意图正常工作:
var subject = new Subject<int>();

var observer1 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer1: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer2 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer2: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer3 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer3: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));

subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer3);

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();

这是输出结果(Observer1获得线程14,Observer2获得线程15,Observer3获得线程16):
Observer1: Observed 1 on thread 14.
Observer2: Observed 1 on thread 15.
Observer1: Observed 2 on thread 14.
Observer1: Observed 3 on thread 14.
Observer2: Observed 2 on thread 15.
Observer2: Observed 3 on thread 15.
Observer3: Observed 1 on thread 16.
Observer3: Observed 2 on thread 16.
Observer3: Observed 3 on thread 16.

谢谢。在修改代码后,我可以看到消费者正在单独的线程上处理消息。然而,所有消费者都在同一个线程上处理消息。我已经更新了代码。请您确认这是否是正确的模式! - Ashish Bhatia
谢谢。在观察者处理完消息后,我需要将消息提交回Kafka。在响应式编程中,是否有推荐的模式,可以让观察者通知可观察对象消息已成功或未成功处理? - Ashish Bhatia

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接