使用响应式扩展在TextChanged事件中进行搜索

10

我正在尝试在一个拥有10000多条记录的数据库表上实现即时搜索。

当搜索文本框内的文本发生变化时,搜索就会开始。当搜索框变为空时,我想调用另一个方法来加载所有数据。

此外,如果用户在加载另一个搜索结果时更改了搜索字符串,则应停止加载那些结果,并转而使用新搜索。

我已经按照以下代码进行了实现,但我想知道是否有更好或更清晰的方法来使用 Rx(响应式扩展)运算符来完成此项工作。我觉得在第一个可观察对象的订阅方法中创建第二个可观察对象比较命令式,对于那个if语句也是一样。

var searchStream = Observable.FromEventPattern(s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .Select(evt =>
        {
            var txtbox = evt.Sender as TextBox;
            return txtbox.Text;
        }
    );

searchStream
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Subscribe(searchTerm =>
        {
            this.parties.Clear();
            this.partyBindingSource.ResetBindings(false);
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm);

            foundParties
                .ToObservable(Scheduler.Default)
                .TakeUntil(searchStream)
                .Buffer(500)
                .ObserveOn(SynchronizationContext.Current)
                .Subscribe(searchResults =>
                    {
                        this.parties.AddRange(searchResults);
                        this.partyBindingSource.ResetBindings(false);
                    }
                    , innerEx =>
                    {

                    }
                    , () => { }
                );
        }
        , ex =>
        {
        }
        , () =>
        {

        }
    );

SearchByNameAndNotes 方法只是通过从数据读取器中读取数据,在 SQLite 中返回一个 IEnumerable<Party>


1
SearchAsync 究竟在做什么? - cwharris
1
为什么SearchByNameAndNotes方法会从SearchAsync和Subscribe方法中被调用? - cwharris
@ChristopherHarris 感谢您的笔记,对于第二个错误我深表歉意,那个 SelectMany 子句是另一次尝试的一部分。它在我的原始代码中有注释,问题的最终版本没有错误(希望如此)。 - Ibrahim Najjar
1个回答

19
我认为您想要的是这样的内容。编辑:从您的评论中,我看到您拥有一个同步存储库API - 我会保留异步版本,并随后添加同步版本。请参考以下说明:

异步存储库版本

异步存储库接口可能如下所示:

public interface IPartyRepository
{
    Task<IEnumerable<Party>> GetAllAsync(out long partyCount);
    Task<IEnumerable<Party>> SearchByNameAndNotesAsync(string searchTerm);
}

然后我将查询重构为:

var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text) // better to select on the UI thread
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    // placement of this is important to avoid races updating the UI
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        // I like to use Do to make in-stream side-effects explicit
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })
    // This is "the money" part of the answer:
    // Don't subscribe, just project the search term
    // into the query...
    .Select(searchTerm =>
    {
        long partyCount;
        var foundParties = string.IsNullOrEmpty(searchTerm)
            ? partyRepository.GetAllAsync(out partyCount)
            : partyRepository.SearchByNameAndNotesAsync(searchTerm);

        // I assume the intention of the Buffer was to load
        // the data into the UI in batches. If so, you can use Buffer from nuget
        // package Ix-Main like this to get IEnumerable<T> batched up
        // without splitting it up into unit sized pieces first
        return foundParties
            // this ToObs gets us into the monad
            // and returns IObservable<IEnumerable<Party>>
            .ToObservable()
            // the ToObs here gets us into the monad from
            // the IEnum<IList<Party>> returned by Buffer
            // and the SelectMany flattens so the output
            // is IObservable<IList<Party>>
            .SelectMany(x => x.Buffer(500).ToObservable())
            // placement of this is again important to avoid races updating the UI
            // erroneously putting it after the Switch is a very common bug
            .ObserveOn(SynchronizationContext.Current); 
    })
    // At this point we have IObservable<IObservable<IList<Party>>
    // Switch flattens and returns the most recent inner IObservable,
    // cancelling any previous pending set of batched results
    // superceded due to a textbox change
    // i.e. the previous inner IObservable<...> if it was incomplete
    // - it's the equivalent of your TakeUntil, but a bit neater
    .Switch() 
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });

同步存储库版本

同步存储库接口可能如下所示:

public interface IPartyRepository
{
    IEnumerable<Party> GetAll(out long partyCount);
    IEnumerable<Party> SearchByNameAndNotes(string searchTerm);
}

个人而言,我不建议像这样同步使用存储库接口。为什么呢?通常它会执行IO操作,因此您将浪费一个线程的时间。
您可能会说客户端可以从后台线程调用,或者您可以在任务中包装它们的调用-但我认为这不是正确的方法。
  • 客户端不知道您将要阻塞;它没有在合同中表达
  • 应该是存储库处理实现的异步方面-毕竟,如何最好地实现只有存储库实现者才最清楚。
无论如何,接受上述内容,一种实现方式如下(当然,它与异步版本大部分相似,因此我只注释了差异):
var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })       
    .Select(searchTerm =>
        // Here we wrap the synchronous repository into an
        // async call. Note it's simply not enough to call
        // ToObservable(Scheduler.Default) on the enumerable
        // because this can actually still block up to the point that the
        // first result is yielded. Doing as we have here,
        // we guarantee the UI stays responsive
        Observable.Start(() =>
        {
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm)
                ? partyRepository.GetAll(out partyCount)
                : partyRepository.SearchByNameAndNotes(searchTerm);

            return foundParties;
        }) // Note you can supply a scheduler, default is Scheduler.Default
        .SelectMany(x => x.Buffer(500).ToObservable())
        .ObserveOn(SynchronizationContext.Current))
    .Switch()
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });  

1
好的,我添加了一个关于这个的段落(“如果你想保留...”)。我的偏好是将异步性与仓库绑定在一起,因为它模拟了一个固有的异步任务。 - James World
1
我认为你看到你的版本比詹姆斯的更具有响应性的原因是你使用了Scheduler.Default来运行存储库加载。这是詹姆斯所说的可以在另一个线程上进行加载的可能性。正如詹姆斯所说,如果存储库提供异步API,设计将会更加整洁。 - Niall Connaughton
1
如果您有一个直接提供IObservable的数据层(这比IEnumerable更有意义),那么IO.Buffer也更有意义。这些确实存在 - 例如SignalR、StreamInsight、Terracotta UM。如果它们本地缓冲并直接返回IO<IE<T>>,那就更好了。 - James World
2
换句话说,James的评论是,你的foundParties.ToObservable.TakeUntil.Buffer链将可枚举的parties转换为Observable。这些parties被逐个提供给Buffer,它将把它们转换为一系列可枚举(缓冲区)。但是foundParties已经是一个可枚举的了。因此,从该可枚举中取出块比将其转换为Observable,逐个读取,批量处理并作为可枚举返回更轻便。 - Niall Connaughton
2
然而,如果数据 API 能够异步读取部分结果,它可以返回一个 Observable,您可以使用 IO 缓冲区。 - Niall Connaughton
显示剩余13条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接