如何在Python中检测BinanceSocketManager WebSocket是否断开连接?

6

Binance API和python-binance提供异步(async)功能以进行非阻塞执行,如Binance的Async基础知识所讨论。

我正在使用BinanceSocketManager(异步非阻塞)监听通过websocket传输的实时数据。

在网络断网的情况下,我希望将自动重新连接的功能添加到我的项目中。但是我无法在BinanceSocketManager中找到任何信息。我只能找到指南,它使用了ThreadedWebsocketManager,但这不是一个异步实现。

有人知道如何实现Binance websocket断开检测和自动重新连接机制吗?

这里是我目前的一些代码:

import asyncio
from binance import AsyncClient, BinanceSocketManager


async def main():
    client = await AsyncClient.create()
    await kline_listener(client)

async def kline_listener(client):
    bm = BinanceSocketManager(client)
    async with bm.kline_socket(symbol='BTCUSDT') as stream:
        while True:
            res = await stream.recv()
            print(res)
    # a way detect websocket error/disconnect, callback 'disconnect_callback'

async def disconnect_callback():
    await client.close_connection()
    await main()  # restart client and kline socket

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
2个回答

4

如果有其他人在查看这个问题,针对这个问题,您应该查看BinanceAPIException。代码可能如下所示:

from binance import AsyncClient, BinanceSocketManager
from binance.exceptions import BinanceAPIException

async def main():

    client = await AsyncClient.create()
    bm = BinanceSocketManager(client, user_timeout=60)

    # start any sockets here, i.e a trade socket
    kline_candles = bm.kline_socket('BNBUSDT', interval=client.KLINE_INTERVAL_1MINUTE)

    # start receiving messages
    try:
        status = await client.get_system_status()
        print(status['msg'])

        async with kline_candles as stream:
            for _ in range(5):
                res = await stream.recv()  # create/await response
                await process_message(msg=res, client=client)  # process message
            
    except BinanceAPIException as e:
        print(e)
        await disconnect_callback(client=client)

async def disconnect_callback(client):
    await client.close_connection()  # close connection
    time.sleep(60)  # wait a minute before restarting
    await main()  # restart client and kline socket

async def process_message(msg, client):
    if msg['e'] == 'error':
        await disconnect_callback(client=client)

        print('ERROR OCCURED')
        
    else:
        candle = msg['k']  # get only the candle info within the general dict

        start_time = datetime.utcfromtimestamp(candle['t']/1000).strftime('%Y-%m-%d %H:%M:%S')
        close_time = datetime.utcfromtimestamp(candle['T']/1000).strftime('%Y-%m-%d %H:%M:%S')

        print(f'__ start: {start_time}, close: {close_time}')
        print(msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

还没有测试过这种断开连接的情况,但我认为这样做会起作用。如果有人有其他的笔记,请告诉我。


1

我已经测试了上述代码,证明它相当稳定。以下是我所做的一些改进。

如果在执行此行时您的互联网连接完全中断,我不确定会发生什么:

client = await AsyncClient.create()

这个问题可能可以这样解决(如果有更好的想法,我也很乐意听取):

        while True:
            try:
                client = await AsyncClient.create()
            except Exception as error_msg:
                print(f"error: {error_msg}")
                # should we add a sleep here?
                # time.sleep(3)
            else:
                print("finally got through the loop")
                break

用try/except将其包围是个好主意:

bm = BinanceSocketManager(client, user_timeout=60)

应该在调用stream.recv()时加上asyncio.wait_for(),以覆盖长时间没有数据传入的情况。这通常意味着出现了问题。
    async with kline_candles as stream:
            for _ in range(5):
                try:
                    res = await asyncio.wait_for(stream.recv(), timeout=60)  # create/await response
                    await process_message(msg=res, client=client)  # process message
                except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed, asyncio.exceptions.CancelledError, asyncio.exceptions.TimeoutError) as error_msg_1:
                    print(f"Error! in main loop 1:\n{error_msg_1}")
                    await disconnect_callback(client=client)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接