FastAPI的WebSocket无法处理大量的传入数据?

3
我使用Python和FastAPI创建了一个WebSocket服务器:https://fastapi.tiangolo.com/ 然后...它完成了,但是最后,我发现一个新的“错误”,让我感到有些恐慌。
当连接的客户端发送大量数据时,他突然会断开连接。我不确定是WebSocket代码本身、asyncio、uvicorn、Docker还是Linux服务器本身无法处理大量传入的数据。
这是我在Docker容器中运行的服务器代码:
import json
import time

import socketio

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

from starlette.websockets import WebSocketDisconnect


from typing import List


app = FastAPI()


sio = socketio.AsyncServer(async_mode='asgi')
socket_app = socketio.ASGIApp(sio, static_files={'/': 'app.html'})
background_task_started = False


from streaming_class import repetisjons_klasse


import asyncio
from pydantic import BaseModel

class UserClientWebSocket(BaseModel):
    id: str
    ws: WebSocket

    class Config:
        arbitrary_types_allowed = True


class WebPageClientWebSocket(BaseModel):
    id: str
    ws: WebSocket

    class Config:
        arbitrary_types_allowed = True

class ConnectionManager:
    def __init__(self):

        self.active_user_client_connections: List[UserClientWebSocket] = []
        self.collect_user_IDs = []


        self.active_webpage_client_connections: List[WebPageClientWebSocket] = []
        self.collect_webpages_IDs = []


    async def connect_the_user_client(self, websocket: WebSocket, THE_USER_ID):
        await websocket.accept()
        await self.send_message_to_absolutely_everybody(f"User: {THE_USER_ID} connected to server!")
        print("User: {} ".format(THE_USER_ID) + " Connected")
        if THE_USER_ID not in self.collect_user_IDs:
            self.collect_user_IDs.append(THE_USER_ID)
        else:
            await self.send_message_to_absolutely_everybody(f"Somebody connected with the same ID as client: {THE_USER_ID}")
            await self.send_message_to_absolutely_everybody("but Vlori is a nice and kind guy, so he wil not get kicked :)")
            self.collect_user_IDs.append(THE_USER_ID)
        self.active_user_client_connections.append(UserClientWebSocket(ws=websocket, id=THE_USER_ID)) #SJEEKK DENNE LINJA !!!!!
        await self.show_number_of_clients()



    async def connect_the_webpage_client(self, websocket: WebSocket, the_webpage_id):
        await websocket.accept()
        await self.send_message_to_absolutely_everybody(f"User: {the_webpage_id} connected to server through webbrowser!")
        print("User: {} ".format(the_webpage_id) + " Connected")
        if the_webpage_id not in self.collect_webpages_IDs:
            self.collect_webpages_IDs.append(the_webpage_id)
        else:
            await self.send_message_to_absolutely_everybody(f"Somebody connected with the same ID as client: {the_webpage_id}")
            await self.send_message_to_absolutely_everybody("but Vlori is a nice and kind guy, so he wil not get kicked :)")
            self.collect_webpages_IDs.append(the_webpage_id)
        self.active_webpage_client_connections.append(WebPageClientWebSocket(ws=websocket, id=the_webpage_id)) #SJEEKK DENNE LINJA !!!!!
        await self.show_number_of_clients()

    async def disconnect_the_webpage_client(self, websocket: WebSocket, the_webpage_id):
        await websocket.close(code=1000)
        self.collect_webpages_IDs.remove(the_webpage_id)
        self.active_webpage_client_connections.remove(WebPageClientWebSocket(ws=websocket, id=the_webpage_id))
        await self.show_number_of_clients()


    async def disconnect_the_user_client(self, websocket: WebSocket, THE_USER_ID):
        await websocket.close(code=1000)
        self.collect_user_IDs.remove(THE_USER_ID)
        self.active_user_client_connections.remove(UserClientWebSocket(ws=websocket, id=THE_USER_ID))
        await self.show_number_of_clients()





"""
PROBLEM: THE USER GETS DISCONNECTED WHEN SENDING TO MUCH DATA IN REAL TIME!
"""

@app.websocket("/ws/testchannel")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            print("Received data: {} ".format(data))
            #await websocket.send_text(f"you sent message: {data}")
            ¤await connection_manager.send_message_to_absolutely_everybody(data)

      
    except WebSocketDisconnect:
        print("client left chat.")

以下是我在Dockerfile中运行的设置(可能与此有关,但我不确定):
FROM ubuntu:latest
FROM python:3

MAINTAINER raxor2k "xxx.com"

RUN apt-get update -y

RUN apt-get install -y python3-pip build-essential python3-dev

COPY . /app
WORKDIR /app

RUN pip3 install --upgrade pip
RUN pip3 install -r requirements.txt
RUN pip3 install fastapi uvicorn #dennekanfjernes?

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80", "--reload"]

所以,这是我在本地计算机上运行的client.py(运行良好):

#client.py

from websocket import create_connection
import json
import time

ws = create_connection("ws://134.122.76.213:5080/ws/testchannel")

time.sleep(1)


def send_json_all_the_time(position):
    generate_json = { "machineID":"001", "RepSensor": position}
    send_json = json.dumps(generate_json)
    print("JSON SENT FROM IoT SENSOR: {}".format(send_json))
    time.sleep(0.1)
    ws.send(json.dumps(send_json))
    time.sleep(0.1)


while True:
    for x in range(1000):
        send_json_all_the_time(x)

    for x in range(100, -1, -1):
        ws.send("pause a little bit, starting again soon!")
        send_json_all_the_time(x)

我很困惑...我的服务器代码出了什么问题吗?是Linux服务器的问题吗?还是在Docker容器里出了问题?

非常感谢任何帮助!


通常情况下,不会单独运行uvicorn,因为它的文档建议在生产环境中使用gunicorn进行部署(我建议您使用tiangolo的官方镜像)。除此之外,导致应用程序断开连接的可能原因有很多。请提供客户端日志以供进一步分析。 - undefined
1
嗨。我的服务器没有任何问题,问题出在客户端,因为websocket客户端没有包含ping-pong同步。现在一切都正常运行 :) - undefined
好好知道!考虑回答自己的问题,这样我们就可以增加Stack Overflow的知识库。 - undefined
1个回答

2

编辑:经过一些研究、测试和FastAPI“论坛”https://gitter.im/tiangolo/fastapi的反馈,问题出在客户端上,由于缺少ping-pong同步。服务器端没有任何问题。


我遇到了同样的问题,你能详细解释一下这个解决方法吗? - undefined
哦,我修好了,客户端也需要实现乒乓同步。 - undefined
@Tom Chen 如何实现乒乓同步。此回答中分享的链接无法访问!!! - undefined
你用的是什么客户端包?我用的是websockets,它已经实现了ping-pong。 - undefined
是的,我也使用Websockets以及FastAPI。这是否意味着我不需要编写任何额外的代码来实现它呢? - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接