使用Twisted Python编写UDP客户端和服务器

6
我希望创建一个使用Twisted发送和接收UDP数据包的服务器和客户端。我已经用Python套接字编写了这个程序,但是想利用Twisted的回调和线程功能。然而,我需要关于Twisted设计方面的帮助。
我有多种类型的数据包要接收,但假设只有一种:
class Packet(object):
    def __init__(self, data=None):
        self.packet_type = 1
        self.payload = ''
        self.structure = '!H6s'
        if data == None:
            return

        self.packet_type, self.payload = struct.unpack(self.structure, data)

    def pack(self):
        return struct.pack(self.structure, self.packet_type, self.payload)

    def __str__(self):
        return "Type: {0}\nPayload {1}\n\n".format(self.packet_type, self.payload)

我创建了一个协议类(几乎是直接复制示例),当我从另一个程序发送数据时,它似乎可以工作:

class MyProtocol(DatagramProtocol):
    def datagramReceived(self, data, (host, port)):
        p = Packet(data)
        print p

reactor.listenUDP(3000, MyProtocol())
reactor.run()

我不知道的是如何创建一个客户端,可以在网络上发送任意数据包,并被反应器捕捉:

# Something like this:
s = Sender()
p = Packet()
p.packet_type = 3
s.send(p.pack())
p.packet_type = 99
s.send(p.pack())

我还需要确保在客户端和服务器上设置复用地址标志,以便在同一设备上同时运行多个实例(例如,一个脚本正在发送心跳,另一个脚本正在响应心跳等)。请问有人能向我展示如何使用Twisted完成这个任务吗? 更新
以下是我在Python中使用套接字的方式。我可以同时运行多个监听器和发送器,并且它们都能相互通信。如何使用Twisted获得此结果?(监听部分不需要是单独的进程。)
class Listener(Process):
    def __init__(self, ip='127.0.0.1', port=3000):
        Process.__init__(self)
        self.ip = ip
        self.port = port

    def run(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((self.ip, self.port))

        data, from_ip = sock.recvfrom(4096)
        p = Packet(data)
        print p

class Sender(object):
    def __init__(self, ip='127.255.255.255', port=3000):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.ip = (ip, port)

    def send(self, data):
        self.sock.sendto(data, self.ip)

if __name__ == "__main__":
    l = Listener()
    l.start()
    s = Sender()
    p = Packet()
    p.packet_type = 4
    p.payload = 'jake'
    s.send(p.pack())

可行解决方案:


class MySender(DatagramProtocol):
    def __init__(self, packet, host='127.255.255.255', port=3000):
        self.packet = packet.pack()
        self.host = host
        self.port = port

    def startProtocol(self):
        self.transport.write(self.packet, (self.host, self.port))

if __name__ == "__main__":
    packet = Packet()
    packet.packet_type = 1
    packet.payload = 'jake'

    s = MySender(packet)

    reactor.listenMulticast(3000, MyProtocol(), listenMultiple=True)
    reactor.listenMulticast(3000, s, listenMultiple=True)
    reactor.callLater(4, reactor.stop)
    reactor.run()
2个回答

12

就像上面的服务器示例一样,这里也有一个客户端示例。以下内容可以帮助您入门:

好的,这里是使用数据报协议发送和接收心跳的简单示例。

from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
import sys, time

class HeartbeatSender(DatagramProtocol):
    def __init__(self, name, host, port):
        self.name = name
        self.loopObj = None
        self.host = host
        self.port = port

    def startProtocol(self):
        # Called when transport is connected
        # I am ready to send heart beats
        self.loopObj = LoopingCall(self.sendHeartBeat)
        self.loopObj.start(2, now=False)

    def stopProtocol(self):
        "Called after all transport is teared down"
        pass

    def datagramReceived(self, data, (host, port)):
        print "received %r from %s:%d" % (data, host, port)


    def sendHeartBeat(self):
        self.transport.write(self.name, (self.host, self.port))



class HeartbeatReciever(DatagramProtocol):
    def __init__(self):
        pass

    def startProtocol(self):
        "Called when transport is connected"
        pass

    def stopProtocol(self):
        "Called after all transport is teared down"


    def datagramReceived(self, data, (host, port)):
        now = time.localtime(time.time())  
        timeStr = str(time.strftime("%y/%m/%d %H:%M:%S",now)) 
        print "received %r from %s:%d at %s" % (data, host, port, timeStr)



heartBeatSenderObj = HeartbeatSender("sender", "127.0.0.1", 8005)

reactor.listenMulticast(8005, HeartbeatReciever(), listenMultiple=True)
reactor.listenMulticast(8005, heartBeatSenderObj, listenMultiple=True)
reactor.run()
广播示例只是修改了上述方法:
from twisted.internet.protocol import DatagramProtocol
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
import sys, time

class HeartbeatSender(DatagramProtocol):
    def __init__(self, name, host, port):
        self.name = name
        self.loopObj = None
        self.host = host
        self.port = port

    def startProtocol(self):
        # Called when transport is connected
        # I am ready to send heart beats
        self.transport.joinGroup('224.0.0.1')
        self.loopObj = LoopingCall(self.sendHeartBeat)
        self.loopObj.start(2, now=False)

    def stopProtocol(self):
        "Called after all transport is teared down"
        pass

    def datagramReceived(self, data, (host, port)):
        print "received %r from %s:%d" % (data, host, port)


    def sendHeartBeat(self):
        self.transport.write(self.name, (self.host, self.port))



class HeartbeatReciever(DatagramProtocol):
    def __init__(self, name):
        self.name = name

    def startProtocol(self):
        "Called when transport is connected"
        self.transport.joinGroup('224.0.0.1')
        pass

    def stopProtocol(self):
        "Called after all transport is teared down"


    def datagramReceived(self, data, (host, port)):
        now = time.localtime(time.time())  
        timeStr = str(time.strftime("%y/%m/%d %H:%M:%S",now)) 
        print "%s received %r from %s:%d at %s" % (self.name, data, host, port, timeStr)



heartBeatSenderObj = HeartbeatSender("sender", "224.0.0.1", 8005)

reactor.listenMulticast(8005, HeartbeatReciever("listner1"), listenMultiple=True)
reactor.listenMulticast(8005, HeartbeatReciever("listner2"), listenMultiple=True)
reactor.listenMulticast(8005, heartBeatSenderObj, listenMultiple=True)
reactor.run()

我通过谷歌自己找到了这些例子,但它们并没有解决我遇到的问题。 - Jake
@Jake 这是否解决了套接字重用的问题,还是你在寻找其他东西? - pyfunc
+1 这个可以用,但是因为它只使用了组播,只有一个监听反应器正在接收发送方发出的数据。这让我离我想要的东西更近了一些,也就是向所有正在监听的客户端进行广播。(您应该保留此示例,以供寻找多播的人使用!) - Jake
1
我在使用listenMulticast时,设置了listenMultiple=True属性,这允许地址重用。否则,在使用listenUDP时,无法采取其他方法。 - pyfunc
请问 'name'、'host' 和 'port' 的类型是什么? - Christopher Pisz
显示剩余2条评论

2
请查看echoclient_udp.py示例。
由于UDP在客户端和服务器之间相当对称,因此您只需在那里运行reactor.listenUDPconnect到服务器(这实际上只是设置发送数据包的默认目标),然后使用transport.write发送您的数据包。

你是在建议我调用reactor.listenUDP两次(一次用于服务器,一次用于客户端),然后再调用reactor.run吗?我无法尝试这样做,因为我还没有设置重用地址,所以我不知道那是否有效。 - Jake
我建议您在每个套接字上分别进行一次侦听,可能是在单独的进程中,然后在每个进程中运行reactor.run。 您需要为每个进程拥有不同的(ip,port)组合。 我不明白reuseaddr在这方面的作用是什么? - poolie

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接