使用响应式扩展(Rx)创建REST客户端API

13

我试图理解响应式扩展(Rx)的正确使用场景。一些常见实例是处理UI事件(例如拖放和绘制),以及建议使用Rx处理异步应用程序/操作,比如Web服务调用。

我正在开发一个需要为REST服务编写小型客户端API的应用程序。我需要调用四个REST端点,其中三个用于获取一些参考数据(机场、航空公司和状态),第四个是主要服务,可以根据给定机场提供航班时间。

我创建了暴露三个参考数据服务的类,并且这些方法看起来像这样:

public Observable<IEnumerable<Airport>> GetAirports()
public Observable<IEnumerable<Airline>> GetAirlines()
public Observable<IEnumerable<Status>> GetStatuses()
public Observable<IEnumerable<Flights>> GetFlights(string airport)

在我的GetFlights方法中,我希望每个航班都持有一个出发机场和操作该航班的航空公司的引用。为此,我需要从GetAirports和GetAirlines获取数据。每个机场、航空公司和状态都将被添加到一个字典中,以便在解析每个航班时轻松地设置引用。

flight.Airport = _airports[flightNode.Attribute("airport").Value]
flight.Airline = _airlines[flightNode.Attribute("airline").Value]
flight.Status = _statuses[flightNode.Attribute("status").Value]

我的当前实现看起来像这样:

public IObservable<IEnumerable<Flight>> GetFlightsFrom(Airport fromAirport)
{
    var airports = new AirportNamesService().GetAirports();
    var airlines = new AirlineNamesService().GetAirlines();
    var statuses = new StatusService().GetStautses();


    var referenceData = airports
        .ForkJoin(airlines, (allAirports, allAirlines) =>
                            {
                                Airports.AddRange(allAirports);
                                Airlines.AddRange(allAirlines);
                                return new Unit();
                            })
        .ForkJoin(statuses, (nothing, allStatuses) =>
                            {
                                Statuses.AddRange(allStatuses);
                                return new Unit();
                            });

    string url = string.Format(_serviceUrl, 1, 7, fromAirport.Code);

    var flights = from data in referenceData
                    from flight in GetFlightsFrom(url)
                    select flight;

    return flights;
}

private IObservable<IEnumerable<Flight>> GetFlightsFrom(string url)
{
    return WebRequestFactory.GetData(new Uri(url), ParseFlightsXml);
}

当前的实现基于Sergey的回答,使用ForkJoin确保顺序执行,并且在Flights之前加载引用数据。这个实现比我以前的实现要更加优雅,不需要触发“ReferenceDataLoaded”事件。


答案已更新 - 另外,请查看此线程:http://social.msdn.microsoft.com/Forums/en/rx/thread/20e9fea1-304f-4926-aa02-49ed558a84f5 - 显示如何编写自定义缓冲。 - Sergey Aldoukhov
2个回答

2

我认为,如果你从每个REST调用中收到实体列表,那么你的调用应该有一些不同的签名 - 你不是观察返回集合中的每个值,而是观察调用完成的事件。因此对于机场,它应该具有以下签名:

public IObservable<Aiports> GetAirports()

下一步将并行运行前三个,并等待它们全部完成:
var ports_lines_statuses = 
    Observable.ForkJoin(GetAirports(), GetAirlines(), GetStatuses());

第三步是将上述可观察对象与GetFlights()组合起来:
var decoratedFlights = 
  from pls in ports_lines_statuses
  let airport = MyAirportFunc(pls)
  from flight in GetFlights(airport)
  select flight;

编辑:我仍然不明白为什么您的服务会返回此结果。
IObservable<Airport> 

替代

IObservable<IEnumerable<Airport>>

据我所知,从REST调用中一次性获取所有实体 - 但也许你会分页呢? 无论如何,如果您想让RX进行缓冲,可以使用.BufferWithCount()方法:
    var allAirports = new AirportNamesService()
        .GetAirports().BufferWithCount(int.MaxValue); 
...

然后您可以应用ForkJoin:

var ports_lines_statuses =  
    allAirports
        .ForkJoin(allAirlines, PortsLinesSelector)
        .ForkJoin(statuses, ...

ports_lines_statuses将包含时间轴上的一个单一事件,其中包含所有参考数据。

编辑:这里是另一个例子,使用最新发布的ListObservable:

allAiports = airports.Start(); 
allAirlines = airlines.Start();
allStatuses = statuses.Start();

...
whenReferenceDataLoaded =
  Observable.Join(airports.WhenCompleted()
                 .And(airlines.WhenCompleted())
                 .And(statuses.WhenCompleted())
                 Then((p, l, s) => new Unit())); 



    public static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source)
    {
        return source
            .Materialize()
            .Where(n => n.Kind == NotificationKind.OnCompleted)
            .Select(_ => new Unit());
    }

我实际上想要首先“一次性”获取所有航空公司、机场和状态,因为当我获取航班时,我需要这三个参考数据集存在,以便我可以将它们链接到一个航班。所以我需要将机场获取到像这样的字典中:Dictionary<string, Airport>,这样我就可以执行:flight.Airport = airports[flightXml.AirportCode]。 - Jonas Follesø
我更新了问题并添加了新的方法签名。你是对的,我实际上想要一次性获取所有机场、航空公司和状态。PortLinesSelector是一个将机场和航空公司组合起来的方法,我需要第二个方法将前面的结果与新结果组合起来,这样理解是否正确?我尝试下载最新版本的RX for Silverlight 3/4,但在Observable上找不到Start()方法(只有start with)。 - Jonas Follesø
@jonas-folleso 是的, PortsLinesSelector 类似于 (ports,lines)=> new {ports,lines},第二个选择器将将第三个结果附加到此结果。 这里的想法是尽可能保持函数式风格,只需通过管道传递数据,而不是使用本地变量。 observable 上的 Start() 仅在 .Net4 发布版中才有,因此您必须等待其移植到其他版本... - Sergey Aldoukhov
好的,太棒了。你绝对指引了我正确的方向,由于我得到了一个相当好的解决方案,我会将这个问题标记为已回答。非常感谢! - Jonas Follesø

0
这里的使用情况是基于拉取的 - 使用 IEnumerable 就可以了。如果你想说,通知一个新航班到来的地方,那么在 Observable.Generate 中包装一个基于拉取的 REST 调用可能会有一些价值。

那么在我的情况下,Rx不是构建REST客户端的好方法?由于这是WP7,我无法使其同步,因此替代方法将是执行:GetAirlinesAsync,并具有GetAirlinesCompleted事件。然后,我将不得不调用GetAirlinesAsync、GetAirportsAsync和GetStatusesAsync,并等待所有三个回调事件触发后再调用GetFlights..?我还计划扩展我的方法,每3分钟重新调用GetFlights服务以进行刷新。因此,观察到新的飞行对象的到来似乎是一个好主意..? - Jonas Follesø
如果底层API仅基于异步操作,则使用RX更加合理。Observable.Generate... - Scott Weinstein

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接