如何独立运行WebSockets

3
我尝试启动Binance Websocket来收集蜡烛图数据。如果数据处理函数没有延迟,它可以正常工作。但是当处理一个交易对数据时出现暂停,它也会延迟其他交易对的响应。有人知道如何使它们独立运行吗?
from binance.client import Client
from binance.websockets import BinanceSocketManager

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)

def process(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)

def socket_1():
     conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')

def socket_2():
     conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')

socket_1()
socket_2()

bm.start()

我尝试使用asyncio来使套接字运行两个不同的任务,就像@Mike Malyi建议的那样,但这并没有消除延迟:

import asyncio

def process(msg):
    asyncio.run(main(msg))

async def main(msg):
    if msg['s'] == 'ETHUSDT':
        task1 = asyncio.create_task(process_message(msg))
        await task1
    else:
        task2 = asyncio.create_task(process_message(msg))
        await task2

async def process_message(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        await asyncio.sleep(5)

eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')

bm.start()

我也尝试使用Queuethreads让函数独立运行,但并没有帮助,一个函数仍然会延迟另一个函数:

from queue import Queue

def consumer(in_q):
    while True:
        msg = in_q.get()
        process_message(msg)
    
def producer(out_q):
    eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
    bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)
        print(f"{msg['s']} with delay, {time.strftime('%X')}")
    else:
        print(f"{msg['s']} {time.strftime('%X')}")


q = Queue()
t1 = Thread(target = consumer, args =(q, )) 
t2 = Thread(target = producer, args =(q, )) 
t1.start() 
t2.start() 

bm.start() 
2个回答

2
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
      print(f"{msg['s']} with delay, {time.strftime('%X')}")
      time.sleep(5)
      print('delay end')  
    else:
        print(f"{msg['s']} {time.strftime('%X')}")
  

def build_thread (symbol):
  print('start thread', symbol)
  q = queue.Queue()
  bm = BinanceSocketManager(client, user_timeout=60)
  conn_key = bm.start_kline_socket(symbol, q.put, '1h')
  bm.start()
  while(True):
    msg = q.get()
    process_message(msg)

thread.start_new_thread(build_thread, ('ETHUSDT', ))  
thread.start_new_thread(build_thread, ('BNBUSDT', ))  

1
@RuslanAsadullin 没错。这是因为你使用了await task1。只需删除这行代码即可。你的脚本仍会等待这行代码执行完毕,直到process_message被解决。 - undefined
1
是的,即使之前的一个实例没有启动,它也可以启动另一个函数实例。这不是你的问题吗?那么你想要实现什么?你在"有人知道如何独立运行它们吗?"这句话中指的是什么? - undefined
1
eth_key = bm.start_kline_socket('ETHUSDT', lambda msg: asyncio.run(main(msg)), '1h') 尝试类似这样的方式。Lambda函数应该为每次运行的main创建独立实例。 - undefined
我按照你的建议去做了,但是所有功能仍然普遍存在延迟。感谢你的时间,@MikeMalyi。 - undefined
1
我不想放弃 :-). 切换到线程 https://dev59.com/dHA85IYBdhLWcg3wD_S8 - undefined
显示剩余6条评论

0

这是设置从SQL获取交易对和止损水平(为了让代码正常工作,这里使用了内联查询),并在止损水平低于收盘价时停止套接字的过程。每个交易对都在自己的进程中运行,因此可以根据可用的CPU线程数量进行扩展。

import config
from binance import ThreadedWebsocketManager
from datetime import datetime
import pyodbc
from multiprocessing import Pool, cpu_count

KEY = config.binance_key
SECRET = config.binance_secret
BASE_URL = config.binance_base_url

''' ======  begin of functions ====== '''
def exec_sql (query) :
    cnxn_p = pyodbc.connect(config.sql_connection)
    cursor_p = cnxn_p.cursor()
    cursor_p.execute(query)
    cnxn_p.commit()
    cursor_p.close()
    cnxn_p.close()
    
def process_message(pair,stop):
    print(pair)
    print(stop)

    twm = ThreadedWebsocketManager(api_key=KEY, api_secret=SECRET)
    # start is required to initialise its internal loop
    twm.start()

    def handle_socket_message(msg):
        transactiontime = msg['k']['T'] / 1000
        transactiontime = datetime.fromtimestamp(transactiontime).strftime('%d %b %Y %H:%M:%S')

        if msg['e'] != 'error':
            # print("{} - {} - Interval {} - Open: {} - Close: {} - High: {} - Low: {} - Volume: {}".
            #      format(transactiontime,msg['s'],msg['k']['i'],msg['k']['o'],msg['k']['c'],msg['k']['h'],msg['k']['l'],msg['k']['v']))
            print("{} - {} - Interval {} - Close: {} - Stop: {}".
                 format(transactiontime,msg['s'],msg['k']['i'],msg['k']['c'], stop ))
        else:
            print(msg)

        Close = float(msg['k']['c'])
        if Close < stop:
            print(pair + ' close is below Stop')
            twm.stop()

    twm.start_kline_socket(callback=handle_socket_message, symbol=pair)
    twm.join()  

def main():
       
    print(f'starting computations on {cpu_count()} cores')

    # connect SQL server
    cnxn = pyodbc.connect(config.sql_connection)
    cursor = cnxn.cursor()
    sql = """select 'BNBBTC' as pair, 0.01086300 as stop
            union
            select 'BTCUSDT', 56234"""
    cursor.execute(sql)
    
    # iterate pairs
    rows = cursor.fetchall()
    pairs = []
    stops = []
    for row in rows:       
        pairs.append(row.pair)
        stops.append(row.stop)

    with Pool() as pool:
        pool.starmap(process_message, zip(pairs,stops))
    pool.close()

    print('pool done')    

    
if __name__ == '__main__':
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接