多线程用于向Web服务发出HTTP POST请求

7
我想在C#中向Web服务发送多个HTTP POST请求。例如,如果n=3,则应从3个xml文件中进行http post请求,并且响应应写入文件。一旦完成了前3个请求,就会进行下一个3个请求。 因此,我编写了以下代码,但是一开始我得到了随机输出。但现在我要么在内部for循环中获得索引范围异常,要么是内部服务器错误(500)。请建议适当的更改。我正在使用.NET4.0
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Threading;
using System.Xml;
using System.Net;
using System.Threading.Tasks;
namespace ConsoleApplication5
{
class Program
{
    static void Main(string[] args)
    {
        int n = 0;
        Console.WriteLine("Enter the number");
        string s = Console.ReadLine();
        int.TryParse(s, out n);
        string path = "C:\\";
        string[] files = null;
        files = Directory.GetFiles(path, "*.xml", SearchOption.TopDirectoryOnly);


        List<Task> tasks = new List<Task>(files.Length);

        for (int i = 0; i < files.Length; i += n)
        {
            for (int j = 0; j < n; j++)
            {
                int x = i + j;

                if (x < files.Length && files[x] != null)
                {
                    Task t = new Task(() => function(files[x]));
                    t.Start();
                    tasks.Add(t);
                }
            }

            if (tasks.Count > 0)
            {
                Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
                tasks.Clear();
            }
        }
    }
    public static void function(string temp)
    {
        XmlDocument doc = new XmlDocument();
        doc.Load(temp);
        HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");

        request.ContentType = "text/xml;charset=\"utf-8\"";
        request.Accept = "text/xml";
        request.Method = "POST";
        Stream stream = request.GetRequestStream();
        doc.Save(stream);
        stream.Close();
        HttpWebResponse response = (HttpWebResponse)request.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        {
            string soapResult = rd.ReadToEnd();
            doc.LoadXml(soapResult);
            File.WriteAllText(temp, doc.DocumentElement.InnerText);

            //XmlTextWriter xml=new XmlTextWriter(
            Console.WriteLine(soapResult);
            Console.ReadKey();
        }

    }

}

}

3个回答

4
这段代码是有效的。 说明:
  • Firstly the user gives the source and destination paths for the .xml files.
  • Directory.getFiles() helps us to get the .xml files in the string array . (we have to pass .xml as a parameter) .

  • SO now what basically happens is for each file we get at the source pat , a thread is created .

  • But say if the user wants to send "n" requests at a time , then n threads are created at a time.
  • And the next set of threads are not created unless the previous threads are finished executing.
  • This is ensured by thread.Join().
  • And after a request is made to the web service , we get the response by getResponse() and the response is written in .xml files which are stored at the destination paths.

     using System;
     using System.Collections.Generic;
     using System.Linq;
     using System.Text;
     using System.IO;
     using System.Threading;
     using System.Xml;
     using System.Net;
     namespace ConsoleApplication4
     {
         class Program
         {
          int flag = 1;
          string destination;
          string source;
          static void Main(string[] args)
        {
        Console.ForegroundColor = ConsoleColor.Red;
    
        Console.WriteLine("**************************** Send HTTP Post Requests **************************");
        int n = 0;
        Program p = new Program();
        Console.WriteLine("Enter the number of requests you want to send at a time");
        string s = Console.ReadLine();
        int.TryParse(s, out n);
        Console.WriteLine("Enter Source");
        p.source = Console.ReadLine();
        Console.WriteLine("Enter Destination");
        p.destination = Console.ReadLine();
    
        string[] files = null;
        files = Directory.GetFiles(p.source, "*.xml", SearchOption.TopDirectoryOnly);
    
        Thread[] thread = new Thread[files.Length];
    
        int len = files.Length;
        for (int i = 0; i<len; i+=n)
        {
            int x = i;
            //Thread.Sleep(5000);
            for (int j = 0; j < n && x < len; j++)
            {
    
                var localx = x;
                thread[x] = new Thread(() => function(files[localx], p));
                thread[x].Start();
                Thread.Sleep(50);
                //thread[x].Join();
                x++;
            }
            int y = x - n;
            for (; y < x; y++)
            {
                int t = y;
                thread[t].Join();
    
            }
    
        }
    
        // thread[0] = new Thread(() => function(files[0]));
        //thread[0].Start();
        Console.ReadKey();
    
    }
    public static void function(string temp,Program p)
    {
    
        XmlDocument doc = new XmlDocument();
        doc.Load(temp);
    
        string final_d=p.destination + "response " + p.flag + ".xml";
        p.flag++;
        HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");
        request.ContentType = "text/xml;charset=\"utf-8\"";
        request.Accept = "text/xml";
        request.Method = "POST";
        Stream stream = request.GetRequestStream();
        doc.Save(stream);
        stream.Close();
    
        HttpWebResponse response = (HttpWebResponse)request.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        {
            string soapResult = rd.ReadToEnd();
            doc.LoadXml(soapResult);
            File.WriteAllText(final_d, doc.DocumentElement.InnerText);
    
            //XmlTextWriter xml=new XmlTextWriter(
            Console.WriteLine(soapResult);
            //Console.ReadKey();
        }
    }
    

    } }


2
你在原始帖子中遇到的IndexOutOfRangeException是由于你在处理最后一批文件时未正确处理索引。最后一批文件可能是不完整的,而你将其视为普通大小的批次(例如,在你的帖子中n=3)。
由于你正在转向TPL和Tasks,我建议你参考Microsoft .NET并行编程以及管道模式,这似乎非常适合你的情况。你可以利用并发集合和生产者/消费者模式与管道一起使用,如下所示。 BlockingCollection确保项目的并发添加,而BlockingCollection.GetConsumingEnumerable调用会为你的集合生成一个消耗阻塞枚举器。
const int BUFFER_SIZE = 3; // no concurrent items to process
const string XML_FOLDER_PATH = "<whatever>";


public static void Pipeline()
{
  var bufferXmlFileNames = new BlockingCollection<string>(BUFFER_SIZE);
  var bufferInputXmlDocuments = new BlockingCollection<XmlDocument>(BUFFER_SIZE);
  var bufferWebRequests = new BlockingCollection<HttpWebRequest>(BUFFER_SIZE);
  var bufferSoapResults = new BlockingCollection<string>(BUFFER_SIZE);

  var f = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);

  // Stage 1: get xml file paths
  var stage1 = f.StartNew(() => {
  try
  {
    foreach (var phrase in Directory.GetFiles(XML_FOLDER_PATH, "*.xml", SearchOption.TopDirectoryOnly))
    { // build concurrent collection
      bufferXmlFileNames.Add(phrase);
    }
  }
  finally
  { // no more additions acceptedin
    bufferXmlFileNames.CompleteAdding();
  }
});

  // Stage 2: ProduceInputXmlDocuments(bufferXmlFileNames, bufferInputXmlDocuments)
  var stage2 = f.StartNew(() =>  {
  try
  {
    foreach (var xmlFileName in bufferXmlFileNames.GetConsumingEnumerable())
    {
      XmlDocument doc = new XmlDocument();
      doc.Load(xmlFileName);
      bufferInputXmlDocuments.Add(doc);          
    }
  }
  finally
  {
    bufferInputXmlDocuments.CompleteAdding();
  }
});

  // Stage 3:  PostRequests(BlockingCollection<XmlDocument> xmlDocs, BlockingCollection<HttpWebRequest> posts)
  var stage3 = f.StartNew(() =>  {
  try
  {
    foreach (var xmlDoc in bufferInputXmlDocuments.GetConsumingEnumerable())
    {
      HttpWebRequest request = (HttpWebRequest)WebRequest.Create("http://10.76.22.135/wpaADws/ADService.asmx");
      request.ContentType = "text/xml;charset=\"utf-8\"";
      request.Accept = "text/xml";
      request.Method = "POST";
      //
      Stream stream = request.GetRequestStream();
      xmlDoc.Save(stream);
      stream.Close();
      //
      bufferWebRequests.Add(request);
    }
  }
  finally
  {
    bufferWebRequests.CompleteAdding();
  }
});

  // Stage 4: ProcessResponses(bufferWebRequests, bufferSoapResults)
  var stage4 = f.StartNew(() =>
  {
    try
    {
      foreach (var postRequest in bufferWebRequests.GetConsumingEnumerable())
      {
        HttpWebResponse response = (HttpWebResponse)postRequest.GetResponse();
        using (StreamReader rd = new StreamReader(response.GetResponseStream()))
        {
          string soapResult = rd.ReadToEnd();
          bufferSoapResults.Add(soapResult);
        }
      }
    }
    finally
    {
      bufferSoapResults.CompleteAdding();
    }
  });

  // stage 5: update UI
  var stage5 = f.StartNew(() =>
  {
    foreach (var soapResult in bufferSoapResults.GetConsumingEnumerable())
    {
      Console.WriteLine(soapResult);
    }
  });

  // display blocking collection load state, 
  // the number of elements in each blocking collection of the pipeline stages
  // you can supress this call completely, because it is informational only
  var stageDisplay = f.StartNew(
    () =>
    {
      while (true)
      {
        Console.WriteLine("{0,10} {1,10} {2,10} {3,10}", bufferXmlFileNames.Count, bufferInputXmlDocuments.Count, bufferWebRequests.Count, bufferSoapResults.Count);
        //check last stage completion
        if (stage5.IsCompleted)
          return;
      }
    }
      );
  Task.WaitAll(stage1, stage2, stage3, stage4, stage5); //or
  //Task.WaitAll(stage1, stage2, stage3, stage4, stage5, stageDisplay);
}

你能告诉我应该放哪些命名空间吗?因为你的代码中有很多新概念对我来说。 - Tanay Narkhede
好的,我明白了命名空间...你能解释一下输出结果吗?我没看懂。 这是截图链接: http://prntscr.com/7fbpna http://prntscr.com/7fbp4f - Tanay Narkhede

1
如何使用类似这样的任务:
    List<Task> tasks = new List<Task>(n);

    for (int i = 0; i < files.Length; i += n)
    {
        for (int j = 0; j < n; j++)
        {
            int x = i + j;

            if (x < files.Length && files[x] != null)
            {
                Task t = new Task(() => function(files[x]));
                t.Start();
                tasks.Add(t);
            }
        }

        if (tasks.Count > 0)
        {
            Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
            tasks.Clear();
        }
    }

我尝试在索引上做得更整洁一些...

还要注意,内部循环中的 int x = i + j; 对于C#捕获lambda变量的方式非常重要。

如果问题是追踪索引算法,也许使用具有有意义名称的索引变量会更好?

    List<Task> tasks = new List<Task>(taskCount);

    for (int filesIdx = 0; filesIdx < files.Length; filesIdx += taskCount)
    {
        for (int tasksIdx = 0; tasksIdx < taskCount; tasksIdx++)
        {
            int index = filesIdx + tasksIdx;

            if (index < files.Length && files[index] != null)
            {
                Task task = new Task(() => function(files[index]));
                task.Start();
                tasks.Add(task);
            }
        }

        if (tasks.Count > 0)
        {
            Task.WaitAll(tasks.ToArray(), Timeout.Infinite); // or less than infinite
            tasks.Clear();
        }
    }

错误1:'System.Threading.Tasks.Task'不包含“Run”的定义。我遇到了这个错误。延迟也是同样的问题。 - Tanay Narkhede
哦,我看到你正在使用dotnet 4...抱歉。我马上编辑。 - Yosef O
我修改了代码以支持dotnet 4(我最初是为dotnet 4.5编写的)。 - Yosef O
使用线程或任务应该同时获取getResponse(),那么这是什么问题呢?如果设置n = 1,它是否有效? - Yosef O
实际上,我想从用户那里获取一个变量n...就是他想一次性发起多少个请求。所以n可以是任何值。 - Tanay Narkhede
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接