如何将多个IObservable<bool>组合成一个复合的bool订阅值

3
我想创建一种基于Rx的WPF ICommand。我想要做的是通过组合任意数量的IObservable<bool>流来控制CanExecute。
我的想法是使用所有谓词的最新逻辑AND值来控制ICommand.CanExecute(object parameter)方法的实现。我不想等待所有谓词都产生结果,它应该使用其中一个源谓词流OnNexts(产生一个值)。
我在尝试解决如何连接任何谓词以导致ICommand.CanExecute产生新值时遇到了困难。
暂且不考虑ICommand的实际实现(因为我的问题更多地涉及Rx方面),有人能建议我如何连接一堆谓词(IObservable<bool>),每当创建它们的底层事物发生变化时就会产生结果,但也可以共同创建一个总布尔值,我可以订阅它。最终值将是当前谓词流值的逻辑AND。
我希望不需要订阅所有谓词流,并且希望RX中有一个很酷的运算符,我可能已经忽略了。
我知道我可以合并流,但这不完全是我想要的行为,因为那只是合并了输入流的最新值。我还知道我可以CombineLatest,但这也不完全正确,因为它只在所有组合流产生值时才会产生结果。
我想要的是将流组合在一起,以便任何更改都会通知订阅者,但我还想知道组合谓词IObservable<bool>流的逻辑AND是什么,以便我可以从这个总体组合值驱动ICommand.CanExecute。
希望这有意义。
下面是一些骨架代码(我留下了一些注释掉的代码,展示了我的Rx命令想法背后的思考过程,因为它可能有助于说明我想要实现的内容)。
public class ViewModel : INPCBase
{
    private string title;
    private bool hasStuff;

    public ViewModel()
    {
        //Initialise some command with 1st predicate, and 
        // initial CanExecute value
        //SomeCommand = new ReactiveCommand(
        //    this.ObserveProperty(x => x.Title)
        //        .Select(x => !string.IsNullOrEmpty(x)), false);
        //SomeCommand.AddPredicate(this.ObserveProperty(x => x.HasStuff));
        //SomeCommand.CommandExecutedStream.Subscribe(x =>
        //    {
        //        MessageBox.Show("Command Running");
        //    });

        IObservable<bool> obsPred = this.ObserveProperty(x => x.Title)
          .Select(x => !string.IsNullOrEmpty(x))
          .StartWith(!string.IsNullOrEmpty(this.Title));
        IObservable<bool> obsPred2 = this.ObserveProperty(x => 
          x.HasStuff).StartWith(this.HasStuff);


        obsPred.Merge(obsPred2).Subscribe(x =>
            {
                //How do I get this to fire whenever obsPred OR 
                //obsPred2 fire OnNext, but also get a combined value (bool) 
                //of the AND of obsPred & obsPred2 (bearing in mind I may 
                //want more than 2 predicates, it should cope with any number of
                //IObservable<bool> predicates
            });
    }

    public string Title
    {
        get
        {
            return this.title;
        }
        set
        {
            RaiseAndSetIfChanged(ref this.title, value, () => Title);
        }
    }


    public bool HasStuff
    {
        get
        {
            return this.hasStuff;
        }
        set
        {
            RaiseAndSetIfChanged(ref this.hasStuff, value, () => HasStuff);
        }
    }

}
2个回答

2
你正在寻找CombineLatest操作符。
ISubject<bool> obsPred  = new BehaviorSubject<bool>(false);
ISubject<bool> obsPred2 = new BehaviorSubject<bool>(false);

Observable.CombineLatest(obsPred, obsPred2, (a, b)=>a&&b)
        .DistinctUntilChanged()
        .Dump();

obsPred.OnNext(true);
obsPred2.OnNext(true);
obsPred2.OnNext(true);
obsPred.OnNext(true);

obsPred.OnNext(false);

这将输出

False
True
False

使用DistinctUntilChanged()可以防止返回重复的连续值。
显然,将BehaviorSubject替换为您的属性可观察对象。

李,我确实尝试了CombineLatest/DistinctUntilChanged组合,但这是否意味着你必须等待所有输入流产生结果。老实说,我尝试了一些东西,可能有点混淆。我明天会再试一下。我相信你是对的。上次你就是对的。我看到你现在也离开了加拿大银行。喜欢新工作地方吗?至于我,我肯定会投入Rx,它很容易让人上瘾,你思考一下,然后突然就能做好。在我看来这很好。 - sacha barber
啊哈,我想我知道为什么我的使用CombineLatest的尝试没有像我预期的那样工作了。从你的博客中,“CombineLatest扩展方法允许您从两个序列中获取最新值,并使用给定的函数将它们转换为结果序列的值。每个输入序列都像Replay(1)一样缓存了最后一个值。一旦两个序列都产生了至少一个值,每次任一序列产生一个值时,都会将每个序列的最新输出传递给resultSelector函数...”,所以我认为我需要使用StartsWith(false)来作为初始值。 - sacha barber
明天我会尝试并告诉你,如果你的答案是正确的(当然它会是的;-)),我会给你投票的。 - sacha barber
好的,终于搞定了,我认为问题出在我如何存储合并流上,而且我也需要重新订阅。我已经发布了下面的可工作代码,如果你想检查一下,那就太好了。 - sacha barber

1

好的,这是我成功实现它的方法

public interface IReactiveCommand : ICommand
{
    IObservable<object> CommandExecutedStream { get; }
    IObservable<Exception> CommandExeceptionsStream { get; }
    void AddPredicate(IObservable<bool> predicate);
}

然后是实际的命令实现。
public class ReactiveCommand : IReactiveCommand, IDisposable
{
    private Subject<object> commandExecutedSubject = new Subject<object>();
    private Subject<Exception> commandExeceptionsSubjectStream = new Subject<Exception>();
    private List<IObservable<bool>> predicates = new List<IObservable<bool>>();
    private IObservable<bool> canExecuteObs;
    private bool canExecuteLatest = true;
    private CompositeDisposable disposables = new CompositeDisposable();

    public ReactiveCommand(IObservable<bool> initPredicate, bool initialCondition)
    {
        if (initPredicate != null)
        {
            canExecuteObs = initPredicate;
            SetupSubscriptions();
        }
        RaiseCanExecute(initialCondition);
    }


    private void RaiseCanExecute(bool value)
    {
        canExecuteLatest = value;
        this.raiseCanExecuteChanged(EventArgs.Empty);
    }


    public ReactiveCommand()
    {
         RaiseCanExecute(true);
    }


    private void SetupSubscriptions()
    {

        disposables = new CompositeDisposable();
        disposables.Add(this.canExecuteObs.Subscribe(
            //OnNext
            x =>
            {
                RaiseCanExecute(x);
            },
            //onError
            commandExeceptionsSubjectStream.OnNext
        ));
    }



    public void AddPredicate(IObservable<bool> predicate)
    {
        disposables.Dispose();
        predicates.Add(predicate);
        this.canExecuteObs = this.canExecuteObs.CombineLatest(predicates.Last(), (a, b) => a && b).DistinctUntilChanged();
        SetupSubscriptions();
    }

    bool ICommand.CanExecute(object parameter)
    {
        return canExecuteLatest;
    }

    public event EventHandler CanExecuteChanged;

    public void Execute(object parameter)
    {
        commandExecutedSubject.OnNext(parameter);
    }


    public IObservable<object> CommandExecutedStream
    {
        get { return this.commandExecutedSubject.AsObservable(); }
    }

    public IObservable<Exception> CommandExeceptionsStream
    {
        get { return this.commandExeceptionsSubjectStream.AsObservable(); }
    }


    protected virtual void raiseCanExecuteChanged(EventArgs e)
    {
        var handler = this.CanExecuteChanged;

        if (handler != null)
        {
            handler(this, e);
        }
    }

    public void Dispose()
    {
       disposables.Dispose();
    }
}

我在使用以下辅助程序

public static class ObservableExtensions
{
    public static IObservable<TValue> ObserveProperty<T, TValue>(
        this T source,
         Expression<Func<T, TValue>> propertyExpression
    )
        where T : INotifyPropertyChanged
    {
        return source.ObserveProperty(propertyExpression, false);
    }

    public static IObservable<TValue> ObserveProperty<T, TValue>(
        this T source,
        Expression<Func<T, TValue>> propertyExpression,
        bool observeInitialValue
    )
        where T : INotifyPropertyChanged
    {
        var memberExpression = (MemberExpression)propertyExpression.Body;

        var getter = propertyExpression.Compile();

        var observable = Observable
            .FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>(
                h => new PropertyChangedEventHandler(h),
                h => source.PropertyChanged += h,
                h => source.PropertyChanged -= h)
            .Where(x => x.EventArgs.PropertyName == memberExpression.Member.Name)
            .Select(_ => getter(source));

        if (observeInitialValue)
            return observable.Merge(Observable.Return(getter(source)));

        return observable;
    }


    public static IObservable<string> ObservePropertyChanged<T>(this T source)
       where T : INotifyPropertyChanged
    {
        var observable = Observable
            .FromEvent<PropertyChangedEventHandler, PropertyChangedEventArgs>(
                h => new PropertyChangedEventHandler(h),
                h => source.PropertyChanged += h,
                h => source.PropertyChanged -= h)
            .Select(x => x.EventArgs.PropertyName);

        return observable;
    }
}

以下是如何连接它的示例

以下是一个示例ViewModel

public class ViewModel : INPCBase
{
    private string title;
    private bool hasStuff;

    public ViewModel()
    {
        IObservable<bool> initPredicate = this.ObserveProperty(x => x.Title).Select(x => !string.IsNullOrEmpty(x)).StartWith(!string.IsNullOrEmpty(this.Title));
        IObservable<bool> predicate = this.ObserveProperty(x => x.HasStuff).StartWith(this.HasStuff);
        SomeCommand = new ReactiveCommand(initPredicate, false);
        SomeCommand.AddPredicate(predicate);
        SomeCommand.CommandExecutedStream.Subscribe(x =>
            {
                MessageBox.Show("Command Running");
            });
    }

    public ReactiveCommand SomeCommand { get; set; }



    public string Title
    {
        get
        {
            return this.title;
        }
        set
        {
            RaiseAndSetIfChanged(ref this.title, value, () => Title);
        }
    }


    public bool HasStuff
    {
        get
        {
            return this.hasStuff;
        }
        set
        {
            RaiseAndSetIfChanged(ref this.hasStuff, value, () => HasStuff);
        }
    }

}

这是一个示例视图。
<Window x:Class="RxCommand.MainWindow"
        xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
        xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
        Title="MainWindow" Height="350" Width="525">
    <Grid>
        <StackPanel Orientation="Horizontal" Height="60" VerticalAlignment="Top">
            <CheckBox IsChecked="{Binding HasStuff, Mode=TwoWay}" Margin="10"></CheckBox>
            <TextBox Text="{Binding Title, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}" Width="150" Margin="10"></TextBox>
            <Button Command="{Binding SomeCommand}" Width="150" Margin="10"></Button>


        </StackPanel>
    </Grid>
</Window>

我猜你大部分的代码都是从现有的片段/库中提取的!否则,这就是一堆代码,只是为了简单地执行ICommand所做的事情。;-) - Lee Campbell
我想进一步补充,为了DRY,可以交换StartWith的位置:this.ObserveProperty(x => x.Title).StartWith(this.Title).Select(x => !string.IsNullOrEmpty(x)); - Lee Campbell
李,我完全理解你所说的DRY注释。我会修改它。至于代码堆叠的注释,我不确定我是否理解了。我认为我需要大部分的东西。你认为可以从中删除什么?只是好奇想知道。大部分都是用于观察属性的辅助工具,也许你是指那个? - sacha barber
是的,这些助手内容。ReactiveCommand可能是从ReactiveUI重新实现的,而其他内容似乎来自于Rxx。 - Lee Campbell
辅助工具是从这里那里拼凑而来的(主要是Keith Woods的博客),响应式UI确实有一个响应式命令,但我想能够添加谓词,而它不允许,所以这就是这个SO的全部内容,现在还是一切顺利,再次感谢。 - sacha barber
是的,Keith 是一个很棒的程序员。毫无疑问。 - sacha barber

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接