使用TcpClient/Socket安全地传输/发送大数据包

3
我正在编写一个基于TCP的客户端,需要发送和接收数据。我已经使用.NET Framework提供的Socket类的异步编程模型(APM)。
连接到套接字后,我使用BeginReceive等待套接字上的数据。
现在,在等待套接字上的数据时,我可能需要通过套接字发送数据。并且可以多次调用send方法,
因此,我必须确保:
- 所有来自先前Send调用的字节都被完全发送。 - 发送数据的方式是安全的,考虑到在数据发送正在进行时,任何对发送数据的调用都可能被执行。
这是我第一次使用套接字,所以我的发送数据的方法正确吗?
    private readonly object writeLock = new object();
    public void Send(NetworkCommand cmd)
    {
        var data = cmd.ToBytesWithLengthPrefix();
        ThreadPool.QueueUserWorkItem(AsyncDataSent, data);
    }

    private int bytesSent;
    private void AsyncDataSent(object odata)
    {
        lock (writeLock)
        {
            var data = (byte[])odata;
            int total = data.Length;
            bytesSent = 0;
            int buf = Globals.BUFFER_SIZE;
            while (bytesSent < total)
            {
                if (total - bytesSent < Globals.BUFFER_SIZE)
                {
                    buf = total - bytesSent;
                }
                IAsyncResult ar = socket.BeginSend(data, bytesSent, buf, SocketFlags.None, DataSentCallback, data);
                ar.AsyncWaitHandle.WaitOne();
            }
        }
    }

对象如何转换为byte[],有时NetworkCommand可能会达到0.5 MB大小。

    public byte[] ToBytesWithLengthPrefix()
    {
        var stream = new MemoryStream();
        try
        {
            Serializer.SerializeWithLengthPrefix(stream, this, PrefixStyle.Fixed32);
            return stream.ToArray();
        }
        finally
        {
            stream.Close();
            stream.Dispose();
        }
    }

完整的类

namespace Cybotech.Network
{
    public delegate void ConnectedDelegate(IPEndPoint ep);
    public delegate void DisconnectedDelegate(IPEndPoint ep);
    public delegate void CommandReceivedDelagate(IPEndPoint ep, NetworkCommand cmd);
}


using System;
using System.Net;
using System.Net.Sockets;
using Cybotech.Helper;
using Cybotech.IO;

namespace Cybotech.Network
{
    public class ClientState : IDisposable
    {
        private int _id;
        private int _port;
        private IPAddress _ip;
        private IPEndPoint _endPoint;
        private Socket _socket;
        private ForwardStream _stream;
        private byte[] _buffer;

        public ClientState(IPEndPoint endPoint, Socket socket)
        {
            Init(endPoint, socket);
        }

        private void Init(IPEndPoint endPoint, Socket socket)
        {
            _endPoint = endPoint;
            _ip = _endPoint.Address;
            _port = _endPoint.Port;
            _id = endPoint.GetHashCode();
            _socket = socket;
            _stream = new ForwardStream();
            _buffer = new byte[Globals.BUFFER_SIZE];
        }

        public int Id
        {
            get { return _id; }
        }

        public int Port
        {
            get { return _port; }
        }

        public IPAddress Ip
        {
            get { return _ip; }
        }

        public IPEndPoint EndPoint
        {
            get { return _endPoint; }
        }

        public Socket Socket
        {
            get { return _socket; }
        }

        public ForwardStream Stream
        {
            get { return _stream; }
        }

        public byte[] Buffer
        {
            get { return _buffer; }
            set { _buffer = value; }
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_stream != null)
                {
                    _stream.Close();
                    _stream.Dispose();
                }

                if (_socket != null)
                {
                    _socket.Close();
                }
            }
        }

        public void Dispose()
        {
            Dispose(true);
        }
    }
}

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using Cybotech.Command;
using Cybotech.Network;

namespace ExamServer.Network
{
    public class TcpServer : IDisposable
    {

        private Socket socket;
        private bool secure;

        private readonly Dictionary<IPEndPoint, ClientState> clients = new Dictionary<IPEndPoint, ClientState>();

        //public events
        #region Events

        public event CommandDelegate CommandReceived;
        public event ConnectedDelegate ClientAdded;
        public event DisconnectedDelegate ClientRemoved;

        #endregion

        //event invokers
        #region Event Invoke methods

        protected virtual void OnCommandReceived(IPEndPoint ep, NetworkCommand command)
        {
            CommandDelegate handler = CommandReceived;
            if (handler != null) handler(ep, command);
        }

        protected virtual void OnClientAdded(IPEndPoint ep)
        {
            ConnectedDelegate handler = ClientAdded;
            if (handler != null) handler(ep);
        }

        protected virtual void OnClientDisconnect(IPEndPoint ep)
        {
            DisconnectedDelegate handler = ClientRemoved;
            if (handler != null) handler(ep);
        }

        #endregion

        //public property
        public string CertificatePath { get; set; }

        public TcpServer(EndPoint endPoint, bool secure)
        {
            StartServer(endPoint, secure);
        }

        public TcpServer(IPAddress ip, int port, bool secure)
        {
            StartServer(new IPEndPoint(ip, port), secure);
        }

        public TcpServer(string host, int port, bool secure)
        {
            StartServer(new IPEndPoint(IPAddress.Parse(host), port), secure);
        }

        private void StartServer(EndPoint ep, bool ssl)
        {
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            socket.Bind(ep);
            socket.Listen(150);
            this.secure = ssl;

            socket.BeginAccept(AcceptClientCallback, null);
        }

        private void AcceptClientCallback(IAsyncResult ar)
        {
            Socket client = socket.EndAccept(ar);
            var ep = (IPEndPoint) client.RemoteEndPoint;
            var state = new ClientState(ep, client);
            if (secure)
            {
                //TODO : handle client for ssl authentication
            }

            //add client to 
            clients.Add(ep, state);
            OnClientAdded(ep);
            client.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ReceiveDataCallback, state);

            //var thread = new Thread(ReceiveDataCallback);
            //thread.Start(state);
        }

        private void ReceiveDataCallback(IAsyncResult ar)
        {
            ClientState state = (ClientState)ar.AsyncState;

            try
            {
                var bytesRead = state.Socket.EndReceive(ar);
                state.Stream.Write(state.Buffer, 0, bytesRead);

                // check available commands
                while (state.Stream.LengthPrefix > 0)
                {
                    NetworkCommand cmd = NetworkCommand.CreateFromStream(state.Stream);
                    OnCommandReceived(state.EndPoint, cmd);
                }

                //start reading data again
                state.Socket.BeginReceive(state.Buffer, 0, state.Buffer.Length, SocketFlags.None, ReceiveDataCallback, state);
            }
            catch (SocketException ex)
            {
                if (ex.NativeErrorCode.Equals(10054))
                {
                    RemoveClient(state.EndPoint);
                }
            }
        }

        private void RemoveClient(IPEndPoint ep)
        {

            OnClientDisconnect(ep);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                //TODO : dispose all the client related socket stuff
            }
        }

        public void Dispose()
        {
            Dispose(true);
        }
    }
}

ThreadPool.QueueUserWorkItem(AsyncDataSent, data); - 在send中调用可能会导致微妙的错误。您是否需要send以FIFO方式工作?即,如果两个send调用A和B非常接近(按顺序),则应首先发生A,然后是B?我不认为ThreadPool.Queue保证FIFO - 因此您可能会遇到难以捕捉的错误... - Vivek
2个回答

3

除非客户端完成当前字节的发送,否则同一客户端将无法向您发送数据。

因此,在服务器端,您将接收到已完成的数据,而不会被来自该客户端的其他新消息中断,但要考虑到如果消息太大,则不会在一次接收中收到所有发送的消息,但最终仍然是一条消息在接收完成后。


2
由于您正在使用TCP,网络协议将确保数据包按发送顺序接收。
关于线程安全性,它取决于您用于发送的实际类。在您提供的代码片段中,声明部分缺失。
根据名称,您似乎在使用Socket,这是线程安全的,因此每次发送实际上都是原子操作。如果使用任何一种Stream,则不是线程安全的,并且需要某种形式的同步,例如锁定,您目前已经在使用它了。
如果要发送大型数据包,则将接收和处理部分拆分为两个不同的线程非常重要。TCP缓冲区实际上比人们想象的要小得多,不幸的是,当缓冲区满时,它不会在日志中进行覆盖,因为协议将继续执行重新发送,直到接收到所有内容。

我已经更新了代码,包括完整的类使用,也许现在你可以更仔细地查看了。实际上,我只对Send方法有疑问,它是否正确发送数据。 - Parimal Raj
好的 - 我认为你可以安全地只使用一个发送,而不是使用缓冲区大小。正如之前所说,我更倾向于在服务器端拥有读取缓冲区,因为这一侧更容易超载。发送较小的包将无济于事,重点在于接收器尽快清空其缓冲区。由于所有内容都被缓冲,因此客户端发送块并没有关系。 - weismat

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接