在Rx中,是否有可能在不同线程上调用订阅者的OnNext方法?

5

我对Rx很陌生。我想知道是否可以将消息分发给不同的订阅者,以便它们在不同的线程上运行?IObservable如何控制它?就我所了解的,普通的Subject实现会在单个线程上依次调用所有订阅者。


public class Subsciber : IObserver<int>
{
    public void OnNext(int a)
    {
        // Do something
    }
    public void OnError(Exception e)
    {
        // Do something
    }
    public void OnCompeleted()
    {
    }

} 

public static class Program
{
   public void static Main()
   {
       var observable = new <....SomeClass....>();
       var sub1 = new Subscriber();
       var sub2 = new Subscriber();
       observable.Subscribe(sub1);
       observable.Subscribe(sub2);
       // some waiting function 
   }
}

如果我使用Subject作为“SomeClass”,那么直到sub1的OnNext()完成后,sub2的OnNext()才会被调用。如果sub1需要很长时间,我不希望它延迟sub2的接收。有人能告诉我Rx如何允许这种对于SomeClass的实现吗。


这个 Observable 是热的还是冷的? - Richard
2个回答

8
您编写的代码几乎可以并行运行可观察对象。如果您将观察者编写为以下方式:
public class Subscriber : IObserver<int>
{
    public void OnNext(int a)
    {
        Console.WriteLine("{0} on {1} at {2}",
            a,
            Thread.CurrentThread.ManagedThreadId,
            DateTime.Now.ToString());
    }
    public void OnError(Exception e)
    { }
    public void OnCompleted()
    { }
} 

接着运行以下代码:

var observable =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(x => (int)x)
        .Take(5)
        .ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);

将会产生以下结果:
0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53

它已经在不同的线程上并行运行订阅。

我使用的重要方法是.ObserveOn扩展方法 - 它使这个工作成为可能。

你应该记住,观察者通常不共享相同的可观测实例。订阅可观测对象就像有效地将从可观测对象源到观察者的唯一“链”连接起来。这与对可枚举项两次调用GetEnumerator类似,你不会共享相同的枚举器实例,而会得到两个唯一的实例。

现在,我想描述一下我所说的链是什么意思。我将给出Reflector.NET提取的代码Observable.GenerateObservable.Where来说明这一点。

以这段代码为例:

var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });

在底层,GenerateWhere都会创建内部Rx类AnonymousObservable<T>的新实例。对于AnonymousObservable<T>的构造函数需要一个Func<IObserver<T>, IDisposable>委托,每当它接收到对Subscribe的调用时就会使用这个委托。下面是从Reflector.NET上稍微整理过的Observable.Generate<T>(...)代码:
public static IObservable<TResult> Generate<TState, TResult>(
    TState initialState,
    Func<TState, bool> condition,
    Func<TState, TState> iterate,
    Func<TState, TResult> resultSelector,
    IScheduler scheduler)
{
    return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
    {
        TState state = initialState;
        bool first = true;
        return scheduler.Schedule((Action self) =>
        {
            bool flag = false;
            TResult local = default(TResult);
            try
            {
                if (first)
                {
                    first = false;
                }
                else
                {
                    state = iterate(state);
                }
                flag = condition(state);
                if (flag)
                {
                    local = resultSelector(state);
                }
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(local);
                self();
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}

Action self 参数是一个递归调用,以迭代输出值。您会注意到,在此代码中,没有存储 observer 或将值粘贴到多个观察者中。此代码对于每个新的观察者运行一次。

从 Reflector.NET 清理稍微的代码 Observable.Where<T>(...) 如下:

public static IObservable<TSource> Where<TSource>(
    this IObservable<TSource> source,
    Func<TSource, bool> predicate)
{
    return new AnonymousObservable<TSource>(observer =>
        source.Subscribe(x =>
        {
            bool flag;
            try
            {
                flag = predicate(x);
            }
            catch (Exception exception)
            {
                observer.OnError(exception);
                return;
            }
            if (flag)
            {
                observer.OnNext(x);
            }
        }, ex => observer.OnError(ex), () => observer.OnCompleted));
}

再次说明,这段代码不能跟踪多个观察者。它有效地调用了Subscribe方法,将自己的代码作为观察者传递给基础的source可观察对象。

在上面的例子中,您应该看到,订阅Where会创建一个对Generate的订阅,因此这是一系列可观察对象的链接。实际上,它是在一系列AnonymousObservable对象上链接订阅调用。

如果您有两个订阅,那么您就有两条链。如果您有1,000个订阅,那么您就有1,000条链。

现在,只是作为一个侧面的注意事项 - 即使有IObservable<T>IObserver<T>接口 - 您非常非常少地需要在自己的类中实际实现这些接口。内置的类和运算符处理了99.99%的情况。这有点像IEnumerable<T> - 你有多少次需要自己实现这个接口呢?

如果需要进一步解释,请告诉我是否有帮助。


抱歉上面的格式不好。但是我很惊讶地发现,对于我在控制台键入的每个整数,d2和d交替在控制台上写入。这是否是您所说的“大多数订阅形成唯一链”的意思? - ada
1
@ada - 我已经编辑了我的答案,以更好地定义“链接”是什么意思。 关于你在上面示例中的代码,通过两次订阅src,你正在做的是对GetInput进行两次调用,并且由于Console.ReadLine是一个阻塞调用,因此两个线程都在等待它们的Console.ReadLine调用返回值。 控制台按系列处理多个调用,而不是并行处理,因此输入的每一行只会传递给一个调用者,另一个调用者必须等待下一行。 这就是为什么你会得到交替的d2d值的原因。 - Enigmativity
现在我明白了链式调用和yield表达式。因此,如果我需要在两个观察者上获得相同的控制台消息,我需要在它们之间添加另一个可观测对象以订阅控制台。实际上,我正在实现一个新的Observable<String>类。我试图说,创建IObserver<T>和IObservable<T>并不是很罕见。其中一个实例是在socket编程中,我可以在负责所有连接部分并将消息简单地分派到接收者的类中实现IObservable。 - ada
你不应该实现一个类,而是使用一个已经存在的主题。 - Enigmativity
@ada - 尝试使用以下src的定义:var src = new Subject<string>(); Observable.Generate(0, x => true, x => x, x => Console.ReadLine(), Scheduler.NewThread).Subscribe(src); - Enigmativity
显示剩余3条评论

2
如果你有一个 IObservable,并且需要强制订阅在不同的线程上运行,那么可以使用ObserveOn函数。
如果运行以下代码,则会强制数字生成器在不同的线程上下文中运行。你还可以使用EventLoopScheduler并指定要使用的System.Thread,设置优先级、名称等等...
void Main()
{
    var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100));

    var disposable = new CompositeDisposable()
    {
       numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)),
       numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId))
    };

    Thread.Sleep(1000);
    disposable.Dispose();
}

输出

Immediate: 10
ThreadPool: 4
TaskPool: 20
TaskPool: 4
ThreadPool: 24
Immediate: 27
Immediate: 10
TaskPool: 24
ThreadPool: 27
Immediate: 24
TaskPool: 26
ThreadPool: 20
Immediate: 26
ThreadPool: 24
TaskPool: 27
Immediate: 28
ThreadPool: 27
TaskPool: 26
Immediate: 10

请注意,我使用 CompositeDisposable 在最后处理所有订阅。例如,在LinqPad中如果您不这样做, Observable.Interval 将继续在内存中运行,直到您关闭进程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接