在.NET中多线程快速高效地处理HTTP请求

3

我从.NET诞生时就开始使用它,也早在并行编程方面有了一定经验...但是,我仍然无法解释这种现象。此代码在生产系统中运行,并且一直在完成其工作,只是希望更好地理解。

我将10个URL传入以下代码以进行并发处理:

    public static void ProcessInParellel(IEnumerable<ArchivedStatus> statuses, 
                                         StatusRepository statusRepository, 
                                         WaitCallback callback, 
                                         TimeSpan timeout)
    {
        List<ManualResetEventSlim> manualEvents = new List<ManualResetEventSlim>(statuses.Count());

        try
        {
            foreach (ArchivedStatus status in statuses)
            {
                manualEvents.Add(new ManualResetEventSlim(false));
                ThreadPool.QueueUserWorkItem(callback,
                                             new State(status, manualEvents[manualEvents.Count - 1], statusRepository));
            }

            if (!(WaitHandle.WaitAll((from m in manualEvents select m.WaitHandle).ToArray(), timeout, false))) 
                throw ThreadPoolTimeoutException(timeout);
        }
        finally
        {
            Dispose(manualEvents);
        }
    }

回调函数类似于:
    public static void ProcessEntry(object state)
    {
        State stateInfo = state as State;

        try
        {
            using (new LogTimer(new TimeSpan(0, 0, 6)))
            {
               GetFinalDestinationForUrl(<someUrl>);
            }
        }
        catch (System.IO.IOException) { }
        catch (Exception ex)
        {

        }
        finally
        {
            if (stateInfo.ManualEvent != null)
                stateInfo.ManualEvent.Set();
        }
    }

每个回调函数都会查看一个URL并遵循一系列重定向(AllowAutoRedirect故意设置为false以处理cookies):
    public static string GetFinalDestinationForUrl(string url, string cookie)
    {
        if (!urlsToIgnore.IsMatch(url))
        {
            HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(url);
            request.AllowAutoRedirect = false;
            request.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;
            request.Method = "GET";
            request.KeepAlive = false;
            request.Pipelined = false;
            request.Timeout = 5000;

            if (!string.IsNullOrEmpty(cookie))
                request.Headers.Add("cookie", cookie);

            try
            {
                string html = null, location = null, setCookie = null;

                using (WebResponse response = request.GetResponse())
                using (Stream stream = response.GetResponseStream())
                using (StreamReader reader = new StreamReader(stream))
                {
                    html = reader.ReadToEnd();
                    location = response.Headers["Location"];
                    setCookie = response.Headers[System.Net.HttpResponseHeader.SetCookie];
                }

                if (null != location)
                    return GetFinalDestinationForUrl(GetAbsoluteUrlFromLocationHeader(url, location),
                                                    (!string.IsNullOrEmpty(cookie) ? cookie + ";" : string.Empty) + setCookie);



                return CleanUrl(url);
            }
            catch (Exception ex)
            {
                if (AttemptRetry(ex, url))
                    throw;
            }
        }

        return ProcessedEntryFlag;
    }

我会在递归的GetFinalDestinationForUrl调用周围使用高精度的StopWatch,阈值为6秒,通常完成回调所需的时间都在这个范围内。然而,WaitAll对于10个线程的慷慨超时时间(0,0,60)仍然经常超时。异常打印出类似以下信息: System.Exception: Not all threads returned in 60 seconds: Max Worker:32767, Max I/O:1000, Available Worker:32764, Available I/O:1000 at Work.Threading.ProcessInParellel(IEnumerable`1 statuses,StatusRepository statusRepository, WaitCallback callback, TimeSpan timeout) at Work.UrlExpanderWorker.SyncAllUsers() 这是在.NET 4上运行的,所有URL的maxConnections设置为100。我的唯一理论是同步的HttpWebRequest调用可能会阻塞比指定的超时时间更长时间。这是唯一合理的解释。问题是如何以及最好如何在该操作上强制执行真正的超时?是的,我知道递归调用在每次调用时都指定了5秒的超时时间,但是处理给定URL可能需要多次调用。但我几乎从不见到StopWatch警告。对于我看到的20-30个WaitAll超时错误,我可能会看到一个消息表明特定线程花费的时间超过了6秒。如果真正的问题是10个线程累计需要超过60秒,那么我应该看到至少1:1的相关性(如果不是更高)。
            Uri uri = new Uri(url);
            HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(uri);
            request.AllowAutoRedirect = false;
            request.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;
            request.Method = "GET";
            request.KeepAlive = false;
            request.Pipelined = false;
            request.Timeout = 7000;
            request.CookieContainer = cookies;

            try
            {
                string html = null, location = null;

                using (new LogTimer("GetFinalDestinationForUrl", url, new TimeSpan(0, 0, 10)))
                    using (HttpWebResponse response = (HttpWebResponse)request.GetResponse())
                    using (Stream stream = response.GetResponseStream())
                    using (StreamReader reader = new StreamReader(stream))
                    {
                        html = reader.ReadToEnd();
                        location = response.Headers["Location"];
                        cookies = Combine(cookies, response.Cookies);

                        if (response.ContentLength > 150000 && !response.ContentType.ContainsIgnoreCase("text/html"))
                            log.Warn(string.Format("Large request ({0} bytes, {1}) detected at {2} on level {3}.", response.ContentLength, response.ContentType, url, level));
                    }

这段代码通常记录了需要5-6分钟完成且大小不超过150000的条目。我不是在说这只发生在某个孤立的服务器上,而是在一些随机的(高知名度的)媒体网站上。

到底发生了什么事情,我们该如何确保代码在合理的时间内退出呢?


在第一个代码示例中(设置作业),您使用了 m.WaitHandle。在 ProcessEntry 中,您使用了 stateInfo.ManualEvent。那是打字错误吗? - Chris Shain
1
如果您正在使用.NET 4.0,您应该考虑使用TPL。我认为这将有助于清理代码并使其更容易看到发生了什么。例如:https://dev59.com/glTTa4cB1Zd3GeqPtIe1 这里有一个超时重载。 - Ryan
你是否对所有的URL使用了maxConnection?http://msdn.microsoft.com/en-us/library/fb6y0fyc.aspx - Aliostad
chris: 不是打字错误。aliostad: 所有的URL都是100,是的。ryan: 有点想转移到TPL,但不确定是否有任何显著的收益,也没有理由为什么这段代码不能工作。 - Nariman
2个回答

2
我同意Aliostad的看法。我没有看到任何明显问题。你是否设置了某种锁定机制,导致这些工作项被串行化?我没有在表面上看到任何问题,但最好再仔细检查一下,以防你的代码比你发布的更复杂。你需要添加日志记录代码,以捕获这些HTTP请求启动时的时间。希望这能给你提供更多线索。
另外,无关的是,我通常避免使用WaitHandle.WaitAll。它有一些限制,比如只允许64个句柄,并且不能在单线程单元(Apartment)中工作。就我而言,我使用以下模式代替。
using (var finished = new CountdownEvent(1);
{
  foreach (var item in workitems)
  {
    var capture = item;
    finished.AddCount();
    ThreadPool.QueueUserWorkItem(
      () =>
      {
        try
        {
          ProcessWorkItem(capture);
        }
        finally
        {
          finished.Signal();
        }
      }
  }
  finished.Signal();
  if (!finished.Wait(timeout))
  {
    throw new ThreadPoolTimeoutException(timeout);
  }
}

1

我已经全面审查了您的代码。就目前而言,我没有发现任何问题。

因此,似乎存在其他问题,但为了处理它,我建议:

GetFinalDestinationForUrl 的开头和结尾编写跟踪、调试或控制台输出,并在跟踪中包括 URL。

这应该有助于您确定问题所在。如果 HttpWebRequest 没有遵守您的 5 秒超时或 .NET 没有遵守您的 100 并发连接,则这将对您有所帮助。

更新您的问题并附上结果,我会再次进行审查。


更新

我已经审查了您的新增功能。很好地隔离了问题:现在确认WaitAll不遵守您的超时时间。

这现在似乎是一个微软支持问题,值得向他们提出 - 除非其他人能够发现此细节的问题。(值得请Eric LippertJon Skeet阅读此问题)

根据我的个人经验,即使我向他们发送了重现代码并且他们重现了它,我也没有得到回复。那是BizTalk,而这是.NET框架,所以我想您可能会得到更好的回应。


我的粗略理论

我也有一个粗略的理论,即在高负载和最大上下文切换时,负责等待的线程没有得到预期的上下文时间,因此它没有机会超时并中止所有这些线程。另一个理论是忙于IO操作的线程需要更长的时间才能中止,并且无法响应中止请求。现在正如我所说的那样,这个理论很粗略,证明或解决它远远超出了我的职权范围。


@Nariman 好的,请看一下。正如我所提到的,最好在这里包括像 Skeet 和 Lippert 这样的大师。 - Aliostad
感谢@aliostad。我的意思是强调HttpWebRequest没有尊重超时 - 对于不大于150000字节的请求,它需要5-6分钟才能完成。这是Azure->高知名度媒体网站。 - Nariman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接