有没有人可以推荐一个适用于Python的Socket.IO客户端库? 我已经找了一些,但是我发现它们要么是服务器实现,要么依赖于像Twisted这样的框架。
我需要一个没有其他框架依赖的客户端库。
仅仅使用其中的某一种连接类型是不够的,因为Python客户端需要与多个socketio服务器通信,其中许多服务器不支持WebSockets。
有没有人可以推荐一个适用于Python的Socket.IO客户端库? 我已经找了一些,但是我发现它们要么是服务器实现,要么依赖于像Twisted这样的框架。
我需要一个没有其他框架依赖的客户端库。
仅仅使用其中的某一种连接类型是不够的,因为Python客户端需要与多个socketio服务器通信,其中许多服务器不支持WebSockets。
Archie1986的答案曾经很好,但在socketio更新后已经过时(更具体地说,它的协议:https://github.com/LearnBoost/socket.io-spec)。据我所知,您需要在请求传输连接(例如websockets)之前手动执行握手...请注意,以下代码不完整且不安全...首先,它忽略了握手响应中返回的支持传输方式列表,并始终尝试获取websocket...另外,它假设握手始终成功...尽管如此,它是一个很好的起点
import websocket, httplib
...
'''
connect to the socketio server
1. perform the HTTP handshake
2. open a websocket connection '''
def connect(self) :
conn = httplib.HTTPConnection('localhost:8124')
conn.request('POST','/socket.io/1/')
resp = conn.getresponse()
hskey = resp.read().split(':')[0]
self._ws = websocket.WebSocket(
'ws://localhost:8124/socket.io/1/websocket/'+hskey,
onopen = self._onopen,
onmessage = self._onmessage)
....
你可能还想了解一下python-websockets:https://github.com/mtah/python-websocket
pip install -e git+https://github.com/liris/websocket-client.git#egg=websocket
完成后,请尝试以下操作,将 SOCKET_IO_HOST
和 SOCKET_IO_PORT
替换为 Socket.IO 服务器的适当位置:import websocket
SOCKET_IO_HOST = "127.0.0.1"
SOCKET_IO_PORT = 8080
socket_io_url = 'ws://' + SOCKET_IO_HOST + ':' + str(SOCKET_IO_PORT) + '/socket.io/websocket'
ws = websocket.create_connection(socket_io_url)
现在你可以使用Python直接与Socket.IO服务器进行交互。要将消息发送到Socket.IO服务器,只需通过此WebSocket连接发送消息即可。为了使Socket.IO服务器能够正确解释从Python Socket.IO客户端通过该WebSocket收到的传入消息,您需要遵守Socket.IO协议,并对可能通过WebSocket连接发送的任何字符串或字典进行编码。例如,在完成上述所有操作后,执行以下操作:
def encode_for_socketio(message):
"""
Encode 'message' string or dictionary to be able
to be transported via a Python WebSocket client to
a Socket.IO server (which is capable of receiving
WebSocket communications). This method taken from
gevent-socketio.
"""
MSG_FRAME = "~m~"
HEARTBEAT_FRAME = "~h~"
JSON_FRAME = "~j~"
if isinstance(message, basestring):
encoded_msg = message
elif isinstance(message, (object, dict)):
return encode_for_socketio(JSON_FRAME + json.dumps(message))
else:
raise ValueError("Can't encode message.")
return MSG_FRAME + str(len(encoded_msg)) + MSG_FRAME + encoded_msg
msg = "Hello, world!"
msg = encode_for_socketio(msg)
ws.send(msg)
带有回调的发射。
from socketIO_client import SocketIO
def on_bbb_response(*args):
print 'on_bbb_response', args
with SocketIO('localhost', 8000) as socketIO:
socketIO.emit('bbb', {'xxx': 'yyy'}, on_bbb_response)
socketIO.wait_for_callbacks(seconds=1)
定义事件。
from socketIO_client import SocketIO
def on_aaa_response(*args):
print 'on_aaa_response', args
socketIO = SocketIO('localhost', 8000)
socketIO.on('aaa_response', on_aaa_response)
socketIO.emit('aaa')
socketIO.wait(seconds=1)
在命名空间中定义事件。
from socketIO_client import SocketIO, BaseNamespace
class Namespace(BaseNamespace):
def on_aaa_response(self, *args):
print 'on_aaa_response', args
self.emit('bbb')
socketIO = SocketIO('localhost', 8000)
socketIO.define(Namespace)
socketIO.emit('aaa')
socketIO.wait(seconds=1)
在单个socket上定义不同的命名空间。
from socketIO_client import SocketIO, BaseNamespace
class ChatNamespace(BaseNamespace):
def on_aaa_response(self, *args):
print 'on_aaa_response', args
class NewsNamespace(BaseNamespace):
def on_aaa_response(self, *args):
print 'on_aaa_response', args
socketIO = SocketIO('localhost', 8000)
chatNamespace = socketIO.define(ChatNamespace, '/chat')
newsNamespace = socketIO.define(NewsNamespace, '/news')
chatNamespace.emit('aaa')
newsNamespace.emit('aaa')
socketIO.wait(seconds=1)
socketIO.on('aaa_response', on_aaa_response)
。如果我添加多个事件,只有第一个会起作用吗? - Wazy使用流行的异步Tornado Web Server,SocketTornad.IO库也是Python的可选项之一。
我写了一个: https://github.com/amitu/amitu-websocket-client/blob/master/amitu/socketio_client.py。它只支持WebSockets,因此对您可能只有较小的实用性。
run()
方法时出现错误:无效的帧
- 这可能是什么意思? - Alp