如何清空服务器的出站消息缓冲区?

4
我已经使用PollingDuplexHttpBinding编写了一个服务,它有一个Silverlight客户端来消费它。我的服务基本上有一定数量的客户端发送他们的数据(非常频繁,每秒钟一次,数据很大,每个调用大约为5KB)到服务端,并且监听其他客户端发送给服务端的新数据以将其路由到他们,这与聊天室架构非常相似。
我注意到的问题是,当客户端通过互联网连接到服务端时,几分钟后服务端的响应变慢,回复变得滞后。我得出结论,当服务端主机的上传容量达到上限(互联网上传速度,在服务器上只有大约15KB / s时),其他客户端发送的消息将被缓冲并在有带宽可用时进行处理。 我想知道如何限制服务使用的缓冲区大小以存储从客户端接收到的消息?对于我的客户端来说,获取所有数据并不那么关键,而是获取其他人发送的最新数据,因此我正在寻找实时连接,即以保证交付为代价。
简而言之,我想能够在服务端填满队列/缓冲区时清理它,或者达到某个上限时开始再次填充它,以消除延迟。我该怎么做?服务端和客户端是否需要减少MaxBufferSize属性?还是我需要在我的服务中编写此功能?有什么想法?
谢谢。
编辑:
这是我的服务架构:
//the service
[ServiceContract(Namespace = "", CallbackContract = typeof(INewsNotification))]
[AspNetCompatibilityRequirements(RequirementsMode = AspNetCompatibilityRequirementsMode.Allowed)]
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Multiple, InstanceContextMode = InstanceContextMode.Single)]
public class NewsService
{


private static Dictionary<IChatNotification, string> clients = new Dictionary<IChatNotification, string>();
private ReaderWriterLockSlim subscribersLock = new ReaderWriterLockSlim();

[OperationContract(IsOneWay = true)]
public void PublishNotifications(byte[] data)
{
            try
            {
                subscribersLock.EnterReadLock();
                List<INewsNotification> removeList = new List<INewsNotification>();
                lock (clients)
                {
                    foreach (var subscriber in clients)
                    {
                        if (OperationContext.Current.GetCallbackChannel<IChatNotification>() == subscriber.Key)
                        {
                            continue;
                        }
                        try
                        {
                            subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

                        }
                        catch (CommunicationObjectAbortedException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (CommunicationException)
                        {
                            removeList.Add(subscriber.Key);
                        }
                        catch (ObjectDisposedException)
                        {
                            removeList.Add(subscriber.Key);
                        }

                    }
                }

                foreach (var item in removeList)
                {
                    clients.Remove(item);
                }
            }
            finally
            {
                subscribersLock.ExitReadLock();
            }
        }

}


//the callback contract
[ServiceContract]
public interface INewsNotification
{
       [OperationContract(IsOneWay = true, AsyncPattern = true)]
       IAsyncResult BeginOnNotificationSend(byte[] data, string username, AsyncCallback callback, object asyncState);
       void EndOnNotificationSend(IAsyncResult result);
}

服务配置如下:
  <system.serviceModel>
    <extensions>
      <bindingExtensions>
        <add name="pollingDuplex" type="System.ServiceModel.Configuration.PollingDuplexHttpBindingCollectionElement, System.ServiceModel.PollingDuplex, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" />
      </bindingExtensions>
    </extensions>
    <behaviors>
      <serviceBehaviors>
        <behavior name="">

          <serviceMetadata httpGetEnabled="true" />
          <serviceThrottling maxConcurrentSessions="2147483647" />
          <serviceDebug includeExceptionDetailInFaults="true" />
        </behavior>
      </serviceBehaviors>
    </behaviors>
    <bindings>
      <pollingDuplex>

        <binding name="myPollingDuplex" duplexMode="SingleMessagePerPoll" 
                 maxOutputDelay="00:00:00" inactivityTimeout="02:00:00" 
                 serverPollTimeout="00:55:00" sendTimeout="02:00:00"  openTimeout="02:00:00" 
                  maxBufferSize="10000"  maxReceivedMessageSize="10000" maxBufferPoolSize="1000"/>

      </pollingDuplex>
    </bindings>
    <serviceHostingEnvironment aspNetCompatibilityEnabled="true" multipleSiteBindingsEnabled="true" />
    <services>
      <service name="NewsNotificationService.Web.NewsService">
        <endpoint address="" binding="pollingDuplex" bindingConfiguration="myPollingDuplex" contract="NewsNotificationService.Web.NewsService" />
        <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
      </service>
    </services>
  </system.serviceModel>
    <system.webServer>
        <directoryBrowse enabled="true" />
    </system.webServer>
</configuration>

客户端通常会在500毫秒到1000毫秒之间调用服务,方法如下:_client.PublishNotificationAsync(byte[] data);回调函数会通知客户端其他客户端发送的通知:
void client_NotifyNewsReceived(object sender, NewsServiceProxy.OnNewsSendReceivedEventArgs e)
        {
                e.Usernamer//WHich client published the data
                e.data//contents of the notification
        }

因此,简而言之,当客户端数量增加,并且服务主机通过互联网的上传速度受限时,由服务发送给订阅者的消息将被缓存在某个地方并在队列中进行处理,这就是问题的根源。我不知道这些消息被缓存在哪里。在局域网中,服务工作正常,因为服务器的上传速度等于其下载速度(对于100KB/s的来电,它会发送100KB/s的通知)。这些消息被缓存在哪里?如何清除此缓冲区?
我尝试了一些实验来查看这些消息是否被缓存在服务中,我尝试在客户端调用此方法,但即使一个客户端仍在接收他人4-5分钟前发送的通知,它也始终返回0:
[OperationContract(IsOneWay = false)]
public int GetQueuedMessages()
{

            return OperationContext.Current.OutgoingMessageHeaders.Count();
}

我觉得我可能找到了问题所在。我觉得我正在寻找的是错误问题的解决方案。我有一种直觉,这是客户端的线程问题。当客户端进行服务调用时,调用是在UI线程上进行的,这导致了延迟,这就是我认为的问题所在。‘队列’实际上发生在客户端计算机上,并且该队列是发送到服务的内容。我正在解决这个问题,我会向你们报告进展。感谢大家的努力。 - Mohammad Sepahvand
2个回答

4
我为您的情况做了一些数学计算。
  • 消息大小 = 100KB
  • 上传通道 = 15KB/s
  • 下载通道 = 100KB/s
  • 客户端每秒调用服务1-2次

客户端通常在500ms-1000ms之间调用服务

这个正确吗?

对于一个客户端,仅针对消息的下载流量将达到100-200KB/s,这只是消息正文。还会有更多的头部和启用安全性的内容。

消息将被组合成异步调用。因此,如果我们有3个客户端,每个客户端发送一个消息回调包含每个客户端的2个消息。4个客户端 - 每个回调中有3条消息。

对于3个客户端,下载通道中将达到200-400KB/s。

对我来说,消息对于您声明的带宽来说太大了。

请检查您是否可以:

  1. 减少消息大小。我不知道你的业务性质,所以无法在这里给出建议。

  2. 对消息或流量使用压缩。

  3. 增加网络带宽。如果没有这样做,即使是理想的解决方案也会有非常高的延迟。你可以花费数天或数周来优化你的代码,即使你正确地利用了你的网络解决方案,解决方案仍然会很慢。

我知道这听起来像显而易见的事情,但有些问题除非你改变问题,否则就没有正确的答案。

在执行上述步骤后,与自定义代码结合使用,管理回调队列。

ServiceThrottlingBehavior 在达到边界时拒绝请求。

http://msdn.microsoft.com/en-us/library/ms735114(v=vs.100).aspx

这只是一个简单的示例。真实数字应根据您的环境进行特定定义。
<serviceThrottling 
 maxConcurrentCalls="1" 
 maxConcurrentSessions="1" 
 maxConcurrentInstances="1"
/>

更新:

我在问题中犯了一个大错误,每个调用都是5KB/s。

即使使用5KB的消息,15KB/s也不足够。 让我们计算仅3个用户的流量。 每秒钟3个传入消息。 系统现在应该使用双工通知用户其他人发送了什么。 每个用户都应该从他的合作伙伴那里收到消息。 我们总共有3条消息。 一条(属于发件人)可能会被跳过,因此每个用户都应该收到2条消息。 3个用户将获得10KB(5KB + 5KB =一条消息10KB)= 30KB。每秒钟一条消息将在上传通道中为3个用户产生30KB / s的流量。

这些消息被缓冲在哪里?我该如何清除此缓冲区?

这取决于您如何托管您的服务。 如果它是自托管服务,则根本不会缓冲。 您有尝试向接收器发送消息的代码,但速度非常慢,因为通道已经被淹没。 对于IIS,可能会进行一些缓冲,但最好不要从外部触摸它。

正确的解决方案是限制带宽。在带宽有限的情况下,您不应尝试向所有客户端发送所有消息。相反,您可以例如限制发送消息的线程数量。因此,对于以上3个用户,您可以将它们顺序发送,或者决定只有一个用户会在该轮中获得更新,下一个用户在下一轮中获得更新。
因此,总体思路不是在有数据时尝试将所有内容立即发送给每个人,而是仅发送您能够承担的数据量,并使用线程计数或响应速度进行控制。

我已经尝试了很多这些设置。我会再看一下并告诉你它的效果如何,谢谢。 - Mohammad Sepahvand
+1 我认为这是一个非常好的解决方案。如果服务器太忙,拒绝客户端并让客户端处理它。 - flayn
@ClunkyCoder 请阅读我的回答。如果您的通道速度为15KB/s,那么发送>100 KB/s是不可能的。那是你的队列。这在物理上是不可能的。你的路上有巨大的交通堵塞,而你却问“哪辆车应该先离开车库?”实际上并不重要。所有的车都会被困在交通中。 - Dmitry Harnitski
@dharnitski,你说得对。在问题上我犯了一个大错误,每个调用是5KB/s,我错误地测量了100KB/s,我不知道我从哪里得到的:@正如你所提到的,如果每个调用的大小为100KB,即使只有几秒钟,也不可能向一个客户端发送数据(服务器需要100-200KB/s的上传速度而不是15KB/s!)这种情况在局域网上也会发生,但不太频繁。我刚刚再次测试了我的服务,使用3台计算机,消息在一段时间后被严重排队,但比在互联网上要少得多。我相信这不完全是带宽问题。 - Mohammad Sepahvand
@dharnitski,每当服务从客户端接收消息时,服务器会以5KB/s的速度接收到消息,即使客户端每秒发送2条消息,每条消息也只有2.5KB/s的速度。5KB/s是我期望服务从一个客户端接收到的平均值。我可以百分之百确定这不是带宽问题,即使在无线局域网上(上传速度为10MB/s),如果有4个客户端,每个客户端都发送5KB/s的数据,那么服务主机将每秒接收20KB/s的数据,并发送相同数量的数据(20KB)。我注意到出现了延迟。我寻找了其他可能的原因,我认为我知道问题出在哪里了。 - Mohammad Sepahvand
显示剩余2条评论

1

MaxBufferSize不会帮助您解决这个问题。您需要自己编写代码,我不知道是否有任何现有的解决方案/框架。不过,它听起来确实是一个有趣的问题。您可以通过维护每个连接的客户端的Queue<Message>来开始,并在将消息推送到此队列(或当客户端调用出队时)时重新评估应发送哪些Message

更新:首先,我建议您放弃尝试从客户端和配置中进行此操作,您需要自己编写代码

在这里,我可以看到您正在向客户端发送:

 subscriber.Key.BeginOnNotificationSend(data, GetCurrentUser(), onNotifyCompletedNotificationSend, subscriber.Key);

因此,您应该将这些通知推送到一个Queue<byte[]>而不是异步地推送到每个客户端。每个客户端连接都将拥有自己的队列,您可能需要构建一个专门为每个客户端连接设计的类:

请注意,此代码不能直接编译,并且可能存在一些逻辑错误,请仅用作指南

public class ClientConnection
{
    private INewsNotification _callback;
    private Queue<byte> _queue = new Queue<byte>();
    private object _lock = new object();
    private bool _isSending
    public ClientConnection(INewsNotification callBack)
    {
           _callback=callback;
    }
    public void Enqueue(byte[] message)
    { 
       lock(_lock)
       {               
           //what happens here depends on what you want to do. 
           //Do you want to only send the latest message? 
           //if yes, you can just clear out the queue and put the new message in there,                         
           //or you could keep the most recent 5 messages.                
           if(_queue.Count > 0)
               _queue.Clear();
          _queue.Enqueue(message);
           if(!_isSending)
               BeginSendMessage();
       }
    }

    private void BeginSendMessage()
    {
       _isSending=true;
        _callback.BeginOnNotificationSend(_queue.Dequeue(), GetCurrentUser(), EndCallbackClient, subscriber.Key);

    }

    private void EndCallbackClient(IAsyncResult ar)
    {
        _callback.EndReceive(ar);
        lock(_lock)
        {
           _isSending=false;
           if(_queue.Count > 0)
              BeginSendMessage();//more messages to send
        }
    }
}

想象一种情况,其中一个消息被推送到客户端,同时发送9个更多的消息调用ClientConnection.Enqueue。当第一个消息完成时,它会检查队列,该队列应该只包含最后一个(第9个)消息。

谢谢,我开始尝试做类似于这个建议的事情,编写一个服务方法来返回OperationContext.Current.OutgoingMessageProperties.Count(),然后在客户端调用它,即使发送方停止发送一段时间后仍有数百条消息从缓冲区转发到客户端,它仍然不断地返回0。你知道这个属性实际上是什么吗? - Mohammad Sepahvand
为什么需要那个属性?你是如何从服务器向客户端发送消息的(请贴一些代码)? - wal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接