使用Rx和SelectMany限制并发请求

5

我有一个URL列表,想要使用HttpClient并发下载这些网页。URL列表可能很大(100个或更多!)

我目前有以下代码:

var urls = new List<string>
            {
                @"http:\\www.amazon.com",
                @"http:\\www.bing.com",
                @"http:\\www.facebook.com",
                @"http:\\www.twitter.com",
                @"http:\\www.google.com"
            };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)));

contents.Subscribe(Console.WriteLine);

问题:由于使用了 SelectMany,几乎同时创建了一大堆任务。如果 URL 列表足够大,则许多任务会超时(我得到了“一个任务已取消”的异常)。因此,我认为应该有一种方法,可以使用某种类型的调度程序来限制并发任务的数量,不允许在给定时间内超过 5 或 6 个。这样,我就可以获得并发下载,而不启动太多可能会停顿的任务,就像它们现在所做的那样。如何做到这一点,以免饱和出现大量超时任务?

1
你可能想考虑使用DataFlow API。 - Yacoub Massad
你能用我的代码集成它吗?我不知道如何使用DataFlow来做。说实话,我从未使用过它,但看一些示例会很有帮助。 - SuperJMN
3个回答

15

记住SelectMany()实际上是Select().Merge()。虽然SelectMany没有maxConcurrent参数,但Merge()有。因此你可以使用它。

从您的示例中,您可以这样做:

var urls = new List<string>
    {
        @"http:\\www.amazon.com",
        @"http:\\www.bing.com",
        @"http:\\www.facebook.com",
        @"http:\\www.twitter.com",
        @"http:\\www.google.com"
    };

var client = new HttpClient();

var contents = urls
    .ToObservable()
    .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri)))
    .Merge(2); // 2 maximum concurrent requests!

contents.Subscribe(Console.WriteLine);

3
这是一个使用DataFlow API的示例:
private static Task DoIt()
{
    var urls = new List<string>
    {
        @"http:\\www.amazon.com",
        @"http:\\www.bing.com",
        @"http:\\www.facebook.com",
        @"http:\\www.twitter.com",
        @"http:\\www.google.com"
    };

    var client = new HttpClient();

    //Create a block that takes a URL as input
    //and produces the download result as output
    TransformBlock<string,string> downloadBlock =
        new TransformBlock<string, string>(
            uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)),
            new ExecutionDataflowBlockOptions
            {
                //At most 2 download operation execute at the same time
                MaxDegreeOfParallelism = 2
            }); 

    //Create a block that prints out the result
    ActionBlock<string> doneBlock =
        new ActionBlock<string>(x => Console.WriteLine(x));

    //Link the output of the first block to the input of the second one
    downloadBlock.LinkTo(
        doneBlock,
        new DataflowLinkOptions { PropagateCompletion = true});

    //input the urls into the first block
    foreach (var url in urls)
    {
        downloadBlock.Post(url);
    }

    downloadBlock.Complete(); //Mark completion of input

    //Allows consumer to wait for the whole operation to complete
    return doneBlock.Completion;
}

static void Main(string[] args)
{
    DoIt().Wait();
    Console.WriteLine("Done");
    Console.ReadLine();
}

哇,看起来真的很不错,但我想知道如何使用 Rx 做相同的事情。提前感谢! - SuperJMN

1
你能看一下这是否有帮助吗?
var urls = new List<string>
        {
            @"http:\\www.amazon.com",
            @"http:\\www.bing.com",
            @"http:\\www.google.com",
            @"http:\\www.twitter.com",
            @"http:\\www.google.com"
        };

var contents =
    urls
        .ToObservable()
        .SelectMany(uri =>
            Observable
                .Using(
                    () => new System.Net.Http.HttpClient(),
                    client =>
                        client
                            .GetStringAsync(new Uri(uri, UriKind.Absolute))
                            .ToObservable()));

抱歉,它的表现不太好。一百个任务因超时而被取消 :( - SuperJMN
你可以尝试使用 EventLoopScheduler 吗? - Enigmativity
谢谢。我已经尝试过了,结果一样。请看@Dorus的答案,因为它简单易行,而且没有太多麻烦就能达到预期效果。 - SuperJMN
@Enigmativity,你介意看一下吗? http://stackoverflow.com/questions/37437657/c-sharp-reactive-extensions-memory-management-and-the-distinct-operator - eran otzap

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接