Python 多线程服务器无法接收客户端数据。

3
在我们的课程范围内,我们的老师要求我们编写一个客户端-服务器程序,在此程序中,服务器将要相乘的两个矩阵分开,然后将它们发送给客户端,客户端应该计算它们的结果部分并将其发送回服务器。
我成功地将矩阵分成两部分并将其发送到客户端,但我的问题是客户端无法将结果发送回服务器。当我尝试在服务器端接收任何消息时,我的客户端不再接收矩阵进行计算。
以下是我的服务器代码:
!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import socket, AF_INET, SOCK_STREAM, timeout
from threading import Thread
import numpy as np
import pickle
buf = 4096
class ErrorLevels:
 
    OK = "OK"
    ERROR = "ERREUR"
 
 
class Server(Thread):
 
 
    def __init__(self):
 
        Thread.__init__(self)
 
        self.socket = socket(AF_INET, SOCK_STREAM)
        self.socket.bind(("localhost", 2020))
        self.socket.settimeout(0.5)
 
        self.running = False
        self.client_pool = []
 
    def client_handling_stopped(self, client, error_level, error_msg):
 
        print("Le gerant de {} s'est arrete avec le niveau d'erreur {} ({})".format(client.address[0],error_level,error_msg))
 
        self.clean_up()
 
        # self.log_connection_amount()
 
    def log_connection_amount(self):
 
        print("Il y a maintenant {} client(s) connecte(s)".format(len(self.client_pool)))
 
    def stop(self):
 
        print("Arrêt du serveur")
 
        for client in self.client_pool:
            client.close_connection()
 
        self.running = False
 
    def clean_up(self):
        """
        Enleve tous les gérants de clients innactifs de la liste des gerants de clients
        """
        self.client_pool = [client for client in self.client_pool if client.alive]
    #le serveur genere le calcul a envoyer aux clients
     #generation de matrices
    def matrice_aleatoire(self,intervalle, ligne, colonne):
        matrice = np.random.randint(intervalle, size=(ligne, colonne))
        return matrice


    def run(self):
        A = self.matrice_aleatoire(10,100,100)
        B = self.matrice_aleatoire(10,100,100)
#code fonctionnnant pour 10 clients
    #division de A  en 10 sous matrices de 10 lignes et envoie aux clients
        C = np.vsplit(A, 10)
            #dictionnaire a envoyer a chaque client
        data = []
        for i in range(10):
            dic = {'num':i,'partA':C[i],'partB':B}
            data.append(dic)
        print("Démarrage du serveur\nAttente des connexions clients...")
 
        self.running = True
 
        self.socket.listen(5)
        i=-1
 
        while self.running:
 
            try:
 
                client, address = self.socket.accept()
                i=i+1
            except timeout:
                continue # on retourne au début de la boucle jusqu'à avoir un client
 
            print("Connexion depuis {}".format(address))
            #envoie et reception du calcul aux clients connnectes
            #actuellement 10 clients
            client_handling = ClientHandling(client, address,data[i], self.client_handling_stopped)
            self.client_pool.append(client_handling)
            client_handling.start()
            
            
                
 
            # self.log_connection_amount()
 
 #classe d'ojbets thread pour gerer les connections clients
class ClientHandling(Thread):
 
    def __init__(self, client, address,data, exit_callback):
 
        Thread.__init__(self)
 
        self.client = client
        self.address = address
        self.data = data
        self.exit_callback = exit_callback # une fonction qui devra être appelée lorsque cet objet sera devenu inactif
        self.alive = True
 
    def _stop(self, error_level, error_msg):
 
        self.alive = False
        self.close_connection()
        self.exit_callback(self, error_level, error_msg)
 
    def close_connection(self):
 
        self.alive = False
        self.client.close()
        print("Fin de la communication avec {}".format(self.address))
   
   
    def run(self):
 
        try:
 #envoie du calcul
            print("debut envoie du calcul")
            data_string = pickle.dumps(self.data)
            self.client.sendall(data_string)
            print("fin envoie")
 #reception resultat
            ''' 
            here is the problem when i try to receive the result 
            pick_ = b''
            while 1:
                dat = self.client.recv(buf)
                pick_ += dat
                print("reception resultat")
                if not dat:break
            res = pickle.loads(dat)
            print("fin reception")
           # print(res)'''
            
  #quelques exceptions possibles
        except ZeroDivisionError:
 
            self._stop(ErrorLevels.ERROR, "Une division par zero tente")
 
        except ConnectionAbortedError:
 
            if self.alive: # innatendu
                self._stop(ErrorLevels.ERROR, "La connexion abandonnee")
 
            else: # on est dans le cas où le gérant est volontairement arrêté
                return # on arrête donc tout, plus besoin de faire quoi que ce soit
 
        self._stop(ErrorLevels.OK, "Le client a ferme la connection")
 
 
try:
 #lancement du thread serveur
    server = Server()
    server.start()
 
    while True: continue
 
except KeyboardInterrupt:
 
    server.stop()
    server.join()

这是我的client.py文件

import socket
from threading import Thread
#import numpy as np
import pickle
hote = "localhost"
port = 2020
buf = 4096
connexion_avec_serveur = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connexion_avec_serveur.connect((hote, port))
print("Connexion établie avec le serveur sur le port {}".format(port))
#thread pour le calcul du client 
class Calcul(Thread):
    def __init__(self):
        Thread.__init__(self)
    #fonction qui extrait les donnees et multiplie
    def multmat(self,data):
        num = data['num']
        A = data['partA']
        B = data['partB']
        C = A @ B
        resul = {'num':num,'partC':C}
        return resul
    def run(self):
        #reception calcul
        pick_str = b''
        while 1:
            data = connexion_avec_serveur.recv(buf)
            pick_str += data
            if not data:break
            #connexion_avec_serveur.close()
        dic = pickle.loads(pick_str)
        #print(dic)
       #calcul du produit
        res = self.multmat(dic)
        print(res)
        #envoie du resultat du calcul
        data_string = pickle.dumps(res)
        connexion_avec_serveur.sendall(data_string)       
cal = Calcul()
cal.start()
cal.join()
connexion_avec_serveur.close() 

你的HTML和JavaScript代码是什么? - KaiserKatze
1个回答

1
主要问题是客户端不知道何时接收到了来自服务器的完整消息。接收代码期望在处理传入数据之前,服务器关闭连接。但是,服务器无法关闭连接,因为它正在等待客户端通过同一连接发送响应。
客户端在data = connexion_avec_serveur.recv(buf)处被阻塞,直到服务器关闭连接或发生其他网络事件断开该连接。同时,服务器也在dat = self.client.recv(buf)处被阻塞,等待客户端的响应 - 出现了死锁。
解决方案是安排客户端知道何时接收到了来自服务器的完整消息,这意味着添加一些协议。一种方法是让服务器附加一个标记值来表示消息的结束,并让客户端监视该标记。另一种方法是让服务器在消息前面添加有效负载的长度,在本例中为 pickled 数据的长度,我在这里展示。
对于客户端,请更改run()函数:
import struct

    def run(self):
        # First 4 bytes are the length of the payload
        data = connexion_avec_serveur.recv(4)
        msglen = struct.unpack('!L', data)[0]
        print(f'Length of payload {msglen = }')
        payload = []

        while msglen > 0:
            print(f'{msglen = } calling recv...')
            data = connexion_avec_serveur.recv(buf)
            print(f'received {len(data)} bytes')
            payload.append(data)
            msglen -= len(data)

        print(f'total bytes read {sum(len(s) for s in payload)}')
        dic = pickle.loads(b''.join(payload))
        #print(dic)
       #calcul du produit
        res = self.multmat(dic)
        print(res)
        #envoie du resultat du calcul
        data_string = pickle.dumps(res)
        connexion_avec_serveur.sendall(data_string)

而对于服务器:

import struct

    def run(self):

        try:
 #envoie du calcul
            print("debut envoie du calcul")
            data = pickle.dumps(self.data)
            # prepend message with length of the pickled data
            msg = struct.pack(f'!L{len(data)}s', len(data), data)
            print(f'sending {len(msg)} bytes to client')
            self.client.sendall(msg)

            print("fin envoie")
 #reception resultat

            pick_ = b''
            while True:
                print('calling recv()')
                dat = self.client.recv(buf)
                print(f'recv() returned {len(dat)} bytes')
                pick_ += dat
                print("reception resultat")
                if not dat:
                    break

            res = pickle.loads(pick_)
            print(f'{res = }')
            print("fin reception")

  #quelques exceptions possibles
        except ZeroDivisionError:

            self._stop(ErrorLevels.ERROR, "Une division par zero tente")

        except ConnectionAbortedError:

            if self.alive: # innatendu
                self._stop(ErrorLevels.ERROR, "La connexion abandonnee")

            else: # on est dans le cas où le gérant est volontairement arrêté
                return # on arrête donc tout, plus besoin de faire quoi que ce soit

        self._stop(ErrorLevels.OK, "Le client a ferme la connection")

非常感谢,现在它可以工作了。很明显我还有很多网络编程要学习。 - johncoded
没问题。处理低级套接字时需要考虑很多问题。您可能会想点赞并接受我的答案。 - mhawke

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接