如何使用C#并行执行多个“ping”操作

13

我正在尝试计算一组服务器的平均往返时间。为了加快速度,我想并行执行ping操作。我编写了一个名为AverageRoundtripTime()的函数,它似乎可以工作,但由于我对多线程知之甚少,所以我想知道我所做的是否正确。请查看我的代码,让我知道这是否可行或者是否有更好的方法来实现我想要的功能:

public void Main()
{
    // Collection of hosts.
    List<String> hosts = new List<String>();

    // Add 100 hosts to the collection.
    for (Int32 i = 0; i < 100; ++i) hosts.Add("www.google.com");

    // Display the average round-trip time for 100 hosts.
    Console.WriteLine(AverageRoundtripTime(hosts));

}

public Double AverageRoundtripTime(IEnumerable<String> hosts)
{
    // Collection of threads.
    List<Thread> threads = new List<Thread>();

    // Collection of ping replies.
    List<PingReply> pingReplies = new List<PingReply>();

    // Loop through all host names.
    foreach (var host in hosts)
    {
        // Create a new thread.
        Thread thread = new Thread(() =>
        {
            // Variable to hold the ping reply.
            PingReply reply = null;

            // Create a new Ping object and make sure that it's 
            // disposed after we're finished with it.
            using (Ping ping = new Ping())
            {
                    reply = ping.Send(host);

            }

            // Get exclusive lock on the pingReplies collection.
            lock (pingReplies)
            {
                // Add the ping reply to the collection.
                pingReplies.Add(reply);

            }

        });

        // Add the newly created thread to the theads collection.
        threads.Add(thread);

        // Start the thread.
        thread.Start();

    }

    // Wait for all threads to complete
    foreach (Thread thread in threads)
    {
        thread.Join();

    }

    // Calculate and return the average round-trip time.
    return pingReplies.Average(x => x.RoundtripTime);

}

更新:

请查看我提出的一个相关问题:

Task Parallel Library代码在Windows窗体应用程序中冻结-作为Windows控制台应用程序正常工作


1
使用任务并行库。 - SLaks
我同意你的评论。唯一的问题是他使用的.NET版本。 - Pete
1
那么任务并行库就是前进的道路。 - Pete
@SLaks,你能给我一个例子吗? - HydroPowerDeveloper
6个回答

15

ping类有一个方法SendAsync,遵循事件驱动异步编程(EAP)模式。请查看此文章:http://msdn.microsoft.com/en-us/library/ee622454.aspx

这里有一个快速示例,演示了如何以非常基本的方式实现该文章。您可以随意调用此方法,所有ping都将以异步方式完成。

    class Program
    {
    public static string[] addresses = {"microsoft.com", "yahoo.com", "google.com"};
    static void Main(string[] args)
    {
        List<Task<PingReply>> pingTasks = new List<Task<PingReply>>();
        foreach (var address in addresses)
        {
            pingTasks.Add(PingAsync(address));
        }

        //Wait for all the tasks to complete
        Task.WaitAll(pingTasks.ToArray());

        //Now you can iterate over your list of pingTasks
        foreach (var pingTask in pingTasks)
        {
            //pingTask.Result is whatever type T was declared in PingAsync
            Console.WriteLine(pingTask.Result.RoundtripTime);
        }
        Console.ReadLine();
    }

    static Task<PingReply> PingAsync(string address)
    {
        var tcs = new TaskCompletionSource<PingReply>();
        Ping ping = new Ping();
        ping.PingCompleted += (obj, sender) =>
            {
                tcs.SetResult(sender.Reply);
            };
        ping.SendAsync(address, new object());
        return tcs.Task;
    }
}

谢谢,Pete...当我运行你的代码时,我的程序在Task.WaitAll(pingTasks.ToArray());处卡住了。我以前尝试过使用Task.WaitAll(),但它总是会卡住...有什么想法吗? - HydroPowerDeveloper
嘿,Pete,它在WaitAll()处永远冻结,除了你提供的代码之外没有其他运行的内容。我真的很想知道为什么它会冻结,以备将来参考。有任何想法吗? - HydroPowerDeveloper
@Pete...我成功地让你的代码运行了。我创建了一个新的WindowsConsole应用程序,一切都正常。我要感谢你回答我的问题。我的原始程序是一个WindowsForms应用程序...我试图在Windows表单情况下运行你的代码...我将创建另一个问题来处理这种情况。再次感谢! - HydroPowerDeveloper
好的,我会去看看其他的问题 ;) - Pete
在 foreach (var pingTask in pingTasks) 中是否有一种方法可以编写原始主机名,即 microsoft.com?最多只能编写IP地址。 - regeter
显示剩余7条评论

5
使用Parallel.For和ConcurrentBag
    static void Main(string[] args)
    {
        Console.WriteLine(AverageRoundTripTime("www.google.com", 100));
        Console.WriteLine(AverageRoundTripTime("www.stackoverflow.com", 100));
        Console.ReadKey();
    }

    static double AverageRoundTripTime(string host, int sampleSize)
    {
        ConcurrentBag<double> values = new ConcurrentBag<double>();
        Parallel.For(1, sampleSize, (x, y) => values.Add(Ping(host)));
        return values.Sum(x => x) / sampleSize;
    }
    static double Ping(string host)
    {
        var reply = new Ping().Send(host);
        if (reply != null)
            return reply.RoundtripTime;
        throw new Exception("denied");
    }

谢谢,shiznit...你的代码运行得非常好。有一个问题,似乎我的原始代码至少快了2倍...这是为什么呢?使用任务并行库是否存在额外开销? - HydroPowerDeveloper
从资源监视器和Parallel.For的文档来看,它似乎试图限制它将利用的线程数。在这个例子中,我的代码使用了大约20个线程,而您的实现和Pete的则创建了多达40个线程。 - agradl
感谢你的解释,我想知道为什么他们将Parallel.For线程限制在20个?我猜我得去阅读文档了。再次感谢! - HydroPowerDeveloper

4

// 使用LINQ可以使解决方案更简单

List<String> hosts = new List<String>();
for (Int32 i = 0; i < 100; ++i) hosts.Add("www.google.com");

var average = hosts.AsParallel().WithDegreeOfParallelism(64).
              Select(h => new Ping().Send(h).RoundtripTime).Average();


Console.WriteLine(average)

1
也许可以像这样使用 SendPingAsync
using (var ping = new Ping())
{
    var replies = await Task.WhenAll(hosts.Select(x => ping.SendPingAsync(x)))
                            .ConfigureAwait(false);
                            // false here   ^ unless you want to schedule back to sync context
    ... process replies.
}

0
一个解决方案:
internal class Utils
{
    internal static PingReply Ping (IPAddress address, int timeout = 1000, int ttl = 64)
    {
            PingReply tpr = null;
            var p = new Ping ();
            try {

                tpr = p.Send (address,
                    timeout,
                    Encoding.ASCII.GetBytes ("oooooooooooooooooooooooooooooooo"),
                    new PingOptions (ttl, true));

            } catch (Exception ex) {

                tpr = null;

            } finally {
                if (p != null)
                    p.Dispose ();

                p = null;
            }

            return tpr;
        }

        internal static List<PingReply> PingAddresses (List<IPAddress> addresses, int timeout = 1000, int ttl = 64)
        {
            var ret = addresses
                .Select (p => Ping (p, timeout, ttl))
                .Where (p => p != null)
                .Where (p => p.Status == IPStatus.Success)
                .Select (p => p).ToList ();

            return ret;
        }

        internal static Task PingAddressesAsync (List<IPAddress> addresses, Action<Task<List<PingReply>>> endOfPing, int timeout = 1000, int ttl = 64)
        {

            return Task.Factory.StartNew<List<PingReply>> (() => Utils.PingAddresses (
                addresses, timeout, ttl)).ContinueWith (t => endOfPing (t));

        }   
}

并使用:

Console.WriteLine ("start");

Utils.PingAddressesAsync (new List<IPAddress> () { 
                    IPAddress.Parse ("192.168.1.1"), 
                    IPAddress.Parse ("192.168.1.13"), 
                    IPAddress.Parse ("192.168.1.49"),
                    IPAddress.Parse ("192.168.1.200")
                }, delegate(Task<List<PingReply>> tpr) {

                    var lr = tpr.Result;
                    Console.WriteLine ("finish with " + lr.Count.ToString () + " machine found");

                    foreach (var pr in lr) {
                        Console.WriteLine (pr.Address.ToString ());
        }

});

Console.WriteLine ("execute");
Console.ReadLine ();

0

这是一个异步工作器,可以ping多个端点。您可以Start()或Stop()心跳工作器并订阅以下事件:

  • PingUp(当端点下线时边缘触发)
  • PingDown(当端点上线时边缘触发)
  • PulseStarted
  • PulseEnded
  • PingError

-

public class NetworkHeartbeat
{
    private static object lockObj = new object();

    public bool Running { get; private set; }
    public int PingTimeout { get; private set; }
    public int HeartbeatDelay { get; private set; }
    public IPAddress[] EndPoints { get; private set; }
    public int Count => EndPoints.Length;
    public PingReply[] PingResults { get; private set; }
    private Ping[] Pings { get; set; }

    public NetworkHeartbeat(IEnumerable<IPAddress> hosts, int pingTimeout, int heartbeatDelay)
    {
        PingTimeout = pingTimeout;
        HeartbeatDelay = heartbeatDelay;

        EndPoints = hosts.ToArray();
        PingResults = new PingReply[EndPoints.Length];
        Pings = EndPoints.Select(h => new Ping()).ToArray();
    }

    public async void Start()
    {
        if (!Running)
        {
            try
            {
                Debug.WriteLine("Heartbeat : starting ...");

                // set up the tasks
                var chrono = new Stopwatch();
                var tasks = new Task<PingReply>[Count];

                Running = true;

                while (Running)
                {
                    // set up and run async ping tasks                 
                    OnPulseStarted(DateTime.Now, chrono.Elapsed);
                    chrono.Restart();
                    for (int i = 0; i < Count; i++)
                    {
                        tasks[i] = PingAndUpdateAsync(Pings[i], EndPoints[i], i);
                    }
                    await Task.WhenAll(tasks);

                    for (int i = 0; i < tasks.Length; i++)
                    {
                        var pingResult = tasks[i].Result;

                        if (pingResult != null)
                        {
                            if (PingResults[i] == null)
                            {
                                if (pingResult.Status == IPStatus.Success)
                                    OnPingUp(i);
                            }
                            else if (pingResult.Status != PingResults[i].Status)
                            {
                                if (pingResult.Status == IPStatus.Success)
                                    OnPingUp(i);
                                else if (PingResults[i].Status == IPStatus.Success)
                                    OnPingDown(i);
                            }
                        }
                        else
                        {
                            if (PingResults[i] != null && PingResults[i].Status == IPStatus.Success)
                                OnPingUp(i);
                        }

                        PingResults[i] = tasks[i].Result;
                        Debug.WriteLine("> Ping [" + PingResults[i].Status.ToString().ToUpper() + "] at " + EndPoints[i] + " in " + PingResults[i].RoundtripTime + " ms");
                    }

                    OnPulseEnded(DateTime.Now, chrono.Elapsed);

                    // heartbeat delay
                    var delay = Math.Max(0, HeartbeatDelay - (int)chrono.ElapsedMilliseconds);
                    await Task.Delay(delay);
                }
                Debug.Write("Heartbeat : stopped");
            }
            catch (Exception)
            {
                Debug.Write("Heartbeat : stopped after error");
                Running = false;
                throw;
            }
        }
        else
        {
            Debug.WriteLine("Heartbeat : already started ...");
        }
    }

    public void Stop()
    {
        Debug.WriteLine("Heartbeat : stopping ...");
        Running = false;
    }

    private async Task<PingReply> PingAndUpdateAsync(Ping ping, IPAddress epIP, int epIndex)
    {
        try
        {
            return await ping.SendPingAsync(epIP, PingTimeout);
        }
        catch (Exception ex)
        {
            Debug.Write("-[" + epIP + "] : error in SendPing()");
            OnPingError(epIndex, ex);
            return null;
        }
    }

    // Event on ping errors
    public event EventHandler<PingErrorEventArgs> PingError;
    public class PingErrorEventArgs : EventArgs
    {
        public int EndPointIndex { get; private set; }
        public Exception InnerException { get; private set; }

        public PingErrorEventArgs(int epIndex, Exception ex)
        {
            EndPointIndex = epIndex;
            InnerException = ex;
        }
    }
    private void OnPingError(int epIndex, Exception ex) => PingError?.Invoke(this, new PingErrorEventArgs(epIndex, ex));

    // Event on ping Down
    public event EventHandler<int> PingDown;
    private void OnPingDown(int epIndex)
    {
        Debug.WriteLine("# Ping [DOWN] at " + EndPoints[epIndex]);
        PingDown?.Invoke(this, epIndex);
    }

    // Event on ping Up
    public event EventHandler<int> PingUp;
    private void OnPingUp(int epIndex)
    {
        Debug.WriteLine("# Ping [UP] at " + EndPoints[epIndex] );
        PingUp?.Invoke(this, epIndex);
    }

    // Event on pulse started
    public event EventHandler<PulseEventArgs> PulseStarted;
    public class PulseEventArgs : EventArgs
    {
        public DateTime TimeStamp { get; private set; }
        public TimeSpan Delay { get; private set; }

        public PulseEventArgs(DateTime date, TimeSpan delay)
        {
            TimeStamp = date;
            Delay = delay;
        }
    }
    private void OnPulseStarted(DateTime date, TimeSpan delay)
    {
        Debug.WriteLine("# Heartbeat [PULSE START] after " + (int)delay.TotalMilliseconds + " ms");
        PulseStarted?.Invoke(this, new PulseEventArgs(date, delay));
    }

    // Event on pulse ended
    public event EventHandler<PulseEventArgs> PulseEnded;
    private void OnPulseEnded(DateTime date, TimeSpan delay)
    {
        PulseEnded?.Invoke(this, new PulseEventArgs(date, delay));
        Debug.WriteLine("# Heartbeat [PULSE END] after " + (int)delay.TotalMilliseconds + " ms");
    }
} 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接