测量服务器的ping延迟 - Python

14

我有一份服务器IP地址列表,需要检查每个地址是否在线以及延迟多长时间。

目前还没有找到一个简单的实现方法,并且在精确计算延迟方面存在一些问题。


有什么想法吗?


1
这里有一些有用的技巧:https://dev59.com/UHRC5IYBdhLWcg3wYP6h - Dan
@Dan:它们都调用操作系统命令... - RadiantHex
实际上,那个帖子的最佳答案是一个纯Python实现,看起来正好符合你的要求。 - FogleBird
@Fogle:我不知道我怎么错过了它...谢谢。 - RadiantHex
6个回答

8

如果您已经熟悉解析字符串,您可以使用subprocess模块将所需数据获取为字符串,如下所示:

>>> import subprocess
>>> p = subprocess.Popen(["ping.exe","www.google.com"], stdout = subprocess.PIPE)
>>> print p.communicate()[0]

Pinging www.l.google.com [209.85.225.99] with 32 bytes of data:

Reply from 209.85.225.99: bytes=32 time=59ms TTL=52
Reply from 209.85.225.99: bytes=32 time=64ms TTL=52
Reply from 209.85.225.99: bytes=32 time=104ms TTL=52
Reply from 209.85.225.99: bytes=32 time=64ms TTL=52

Ping statistics for 209.85.225.99:
    Packets: Sent = 4, Received = 4, Lost = 0 (0% loss),
Approximate round trip times in milli-seconds:
    Minimum = 59ms, Maximum = 104ms, Average = 72ms

5
这并不是一个很好的方法,因为它显然会针对特定平台(但实际上并没有必要这样做)。 - Kat
2
如果我们想将响应时间作为变量检索,该怎么办? - Stevoisiak
2
这就是“如果你熟悉解析”的地方……可以从类似以下的东西开始:import re; timestr = re.compile("time=[0-9]+ms").findall(str(p.communicate()[0])),然后再进行优化……正则表达式模块(re)非常棒。 - K. Brafford
1
timestr ['时间=12毫秒', '时间=12毫秒', '时间=14毫秒', '时间=13毫秒']
- K. Brafford

6

按照hlovdal的建议,与fping一起使用,在这里是我用于测试代理的解决方案。我只在Linux下尝试过它。如果无法测量ping时间,则返回一个大值。用法:print get_ping_time('<ip>:<port>')

import shlex  
from subprocess import Popen, PIPE, STDOUT

def get_simple_cmd_output(cmd, stderr=STDOUT):
    """
    Execute a simple external command and get its output.
    """
    args = shlex.split(cmd)
    return Popen(args, stdout=PIPE, stderr=stderr).communicate()[0]

def get_ping_time(host):
    host = host.split(':')[0]
    cmd = "fping {host} -C 3 -q".format(host=host)
    res = [float(x) for x in get_simple_cmd_output(cmd).strip().split(':')[-1].split() if x != '-']
    if len(res) > 0:
        return sum(res) / len(res)
    else:
        return 999999

我认为这段代码中的 process. 是一个打字错误。 - kaptan
这个请求在超时之前等待多久? - Stevoisiak
1
注意:在Python 3中,Popen的结果需要进行解码:Popen(args, stdout=PIPE, stderr=stderr).communicate()[0].decode() - maininformer

5

这篇文章有些陈旧,我认为现在存在更好的方法。虽然我是Python新手,但在我的项目中我做了如下处理:

from pythonping import ping

def ping_host(host):
    ping_result = ping(target=host, count=10, timeout=2)

    return {
        'host': host,
        'avg_latency': ping_result.rtt_avg_ms,
        'min_latency': ping_result.rtt_min_ms,
        'max_latency': ping_result.rtt_max_ms,
        'packet_loss': ping_result.packet_loss
    }

hosts = [
    '192.168.48.1',
    '192.168.48.135'
]

for host in hosts:
    print(ping_host(host))

结果:

{'host': '192.168.48.1', 'avg_latency': 2000.0, 'min_latency': 2000, 'max_latency': 2000, 'packet_loss': 1.0}
{'host': '192.168.48.135', 'avg_latency': 42.67, 'min_latency': 41.71, 'max_latency': 44.17, 'packet_loss': 0.0}

您可以在此处找到pythonping库:https://pypi.org/project/pythonping/

我尝试执行 ping(target=host, count=10, timeout=2) 时出现错误:File "/usr/local/Cellar/python@3.7/3.7.9_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/socket.py", line 151, in __init__ _socket.socket.__init__(self, family, type, proto, fileno) PermissionError: [Errno 1] Operation not permitted 你能检查一下你的代码是否仍然可以运行吗? - miguelmorin
1
它对我仍然有效,我使用Python 3.9。 - Allan
你需要像 https://dev59.com/CKLia4cB1Zd3GeqPeier 中建议的那样以 root 权限运行吗? - miguelmorin
1
我正在Windows 10上运行它,但我不是计算机的管理员。 - Allan
如果我以root身份运行,它可以工作。但我想避免这种情况,所以我会找另一种方法。 - miguelmorin

3
如果您想避免实现所有网络通信细节,可能可以尝试在fping的基础上构建某些东西:

fping是一个类似程序,它使用Internet控制消息协议(ICMP)回显请求来确定目标主机是否响应。fping与ping不同之处在于您可以在命令行上指定任意数量的目标,或者指定包含要ping的目标列表的文件。 fping不会像ping一样发送到一个目标直到超时或回复,而是以轮换方式发送ping数据包并移动到下一个目标。


3
谢谢,运作良好。此外,我想指出,“与ping不同,fping旨在用于脚本,因此其输出被设计为易于解析”。 - Jabba
有没有办法手动指定两个IP地址,并使用fping找到它们之间的延迟? - Prafulla Kumar Sahu
你能提供一个如何使用fping的例子吗? - Stevoisiak

2

https://github.com/matthieu-lapeyre/network-benchmark 这是基于FlipperPA的工作的解决方案:https://github.com/FlipperPA/latency-tester

import numpy
import pexpect


class WifiLatencyBenchmark(object):
    def __init__(self, ip):
        object.__init__(self)

        self.ip = ip
        self.interval = 0.5

        ping_command = 'ping -i ' + str(self.interval) + ' ' + self.ip
        self.ping = pexpect.spawn(ping_command)

        self.ping.timeout = 1200
        self.ping.readline()  # init
        self.wifi_latency = []
        self.wifi_timeout = 0

    def run_test(self, n_test):
        for n in range(n_test):
            p = self.ping.readline()

            try:
                ping_time = float(p[p.find('time=') + 5:p.find(' ms')])
                self.wifi_latency.append(ping_time)
                print 'test:', n + 1, '/', n_test, ', ping latency :', ping_time, 'ms'
            except:
                self.wifi_timeout = self.wifi_timeout + 1
                print 'timeout'

        self.wifi_timeout = self.wifi_timeout / float(n_test)
        self.wifi_latency = numpy.array(self.wifi_delay)

    def get_results(self):
        print 'mean latency', numpy.mean(self.wifi_latency), 'ms'
        print 'std latency', numpy.std(self.wifi_latency), 'ms'
        print 'timeout', self.wifi_timeout * 100, '%'


if __name__ == '__main__':
    ip = '192.168.0.1'
    n_test = 100

    my_wifi = WifiLatencyBenchmark(ip)

    my_wifi.run_test(n_test)
    my_wifi.get_results()

Github代码库: https://github.com/matthieu-lapeyre/network-benchmark

1

感谢Jabba的帮助,但是那段代码对我来说并没有正确运行,所以我进行了一些更改,如下所示:

import shlex
from subprocess import Popen, PIPE, STDOUT


def get_simple_cmd_output(cmd, stderr=STDOUT):
    """
    Execute a simple external command and get its output.
    """
    args = shlex.split(cmd)
    return Popen(args, stdout=PIPE, stderr=stderr).communicate()[0]


def get_ping_time(host):
    host = host.split(':')[0]
    cmd = "fping {host} -C 3 -q".format(host=host)
    # result = str(get_simple_cmd_output(cmd)).replace('\\','').split(':')[-1].split() if x != '-']
    result = str(get_simple_cmd_output(cmd)).replace('\\', '').split(':')[-1].replace("n'", '').replace("-",
                                                                                                        '').replace(
        "b''", '').split()
    res = [float(x) for x in result]
    if len(res) > 0:
        return sum(res) / len(res)
    else:
        return 999999


def main():
    # sample hard code for test
    host = 'google.com'
    print([host, get_ping_time(host)])

    host = 'besparapp.com'
    print([host, get_ping_time(host)])



if __name__ == '__main__':
    main()

这个在Windows上可行吗?还是只能在Linux上使用? - Stevoisiak

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接