RabbitMQ生产者C# .NET Core 5.0内存泄漏

5

我想知道是否有人能够帮助解决以下问题:

我无法解决使用C#编写的RabbitMQ发布程序中出现的内存泄漏问题,该程序使用.Net Core 5.0。

这是csproj文件:

<Project Sdk="Microsoft.NET.Sdk">   
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net5.0</TargetFramework>
    <RuntimeIdentifier>win-x64</RuntimeIdentifier>
  </PropertyGroup>
  ...
</Project>  

我是一名有用的助手,可以为您翻译文本。
我有一个运行在虚拟机中的.NET控制台应用程序,通过API连接到服务器(注册为64位dll并作为COM引用引用),从该服务器获取信息,然后尝试将此信息发布到位于AWS云上的RabbitMQ机器(用于此RMQ实例的多个节点的负载均衡器)。
在代码中访问API的方式如下:
        private void SetUpApi () {
            Utils.log.Info (_api.SetAPIOptions ("<CONNECTIONOPTIONS><CALCULATED_PRICES Enabled='true' MaximumDepth='4'/></CONNECTIONOPTIONS>"));
            _api.OnServerConnect += OnServerConnect;
            _api.OnServerDisconnect += OnServerDisconnect;
            _api.OnNewData += OnNewData;            
        }

        private void OnNewData(string strXML){
            try{
                if (strXML.Contains("<ORDER")){
                    ParseXMLAnswer(strXML, "OnNewData ()");
                }
            }
            catch (Exception ex) {
                if (ex.InnerException is AlreadyClosedException || ex.InnerException is BrokerUnreachableException)
                    Utils.log.Error("OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException ");
                else
                    Utils.printException("OnNewData ()", ex);
            }
        }
        
        private void ParseXMLAnswer(string strOutputXML, string caller) {
            XmlDocument doc = new XmlDocument();
            doc.LoadXml(strOutputXML);
            string jsonText = JsonConvert.SerializeXmlNode(doc);
            var o = JObject.Parse(jsonText);

            if (o["APIDATA"]["ORDER"].Type is JTokenType.Object){
                JObject order = (JObject)o["APIDATA"]["ORDER"];

                SendOrderToRMQ(order);
            }
            else if (o["APIDATA"]["ORDER"].Type is JTokenType.Array){
                JArray orders = (JArray)o["APIDATA"]["ORDER"];
                foreach (var item in orders.Children()){
                    SendOrderToRMQ((JObject)item);
                }
            }
            doc = null;
            GC.Collect();
            GC.WaitForPendingFinalizers();
        }

        private void SendOrderToRMQ (JObject order){
            JObject instrSpeciefier = (JObject) order["INSTSPECIFIER"];

            var firstSeqID = instrSpeciefier.GetValue("@FirstSequenceID").ToString();
            var firstSeqItemID = instrSpeciefier.GetValue("@FirstSequenceItemID").ToString();
                       
            if (sequenceItemsHashed.ContainsKey(firstSeqID) &&
                sequenceItemsHashed[firstSeqID].Contains(firstSeqItemID)){
                string itemName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@FirstSequenceItemName").ToString());
                string instrumentName = Utils.ReplaceSensitiveCharacthers(instrSpeciefier.GetValue("@InstName").ToString());

                int index = sequenceItemsHashed[firstSeqID].IndexOf(firstSeqItemID) + 1;
                var binding = instrumentName + "." + sequencesFromInstruments[firstSeqID] + "." + itemName + "." + index;

                serviceInstance1.Publish(
                   order.ToString(),
                   _exchangeName,
                   "",
                   binding);
            }
            order = null;
            instrSpeciefier = null;           
        }

在高峰业务时间,我会从API接收大约400-500个每秒的消息。这些消息采用XML格式。例如,一条消息可以包含多个订单,如下面的示例所示。一个元素可能包含插入(创建)订单的操作,另一个元素可能包含删除特定订单的操作。
<?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER EngineID="0" PersistentOrderID="2791" ...>
    <INSTSPECIFIER InstID="287" ... />
    ...
  </ORDER>
  <ORDER EngineID="0" PersistentOrderID="9840" ...>
    <INSTSPECIFIER InstID="288" ... />
    ...
  </ORDER>

兔子消息队列(RabbitMQ)服务器已配置,我使用SSL连接它(使用证书和密钥)。我使用RabbitMQ.Client v6.2.1连接到RMQ。交换机、队列和绑定已在RabbitMQ中定义。我的生产者应用程序只需要连接并开始发布。

enter image description here

很重要的一点是我使用同步发布方法,因为我们接收到的消息顺序非常重要。例如,在一条消息中,我们会得到一个创建订单的动作,另一条消息紧随其后告诉我们删除相同的订单。如果我使用异步方法发布到RMQ,可能会在插入操作之前收到删除操作。

  <?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Insert" ...>
      ...
  </ORDER>

删除消息:

  <?xml version="1.0" encoding="utf-16"?>
  <APIDATA xmlns="api-com">
  <ORDER ... PersistentOrderID="2791" OrderID="1234" ... Action="Remove" ...>
      ...
  </ORDER>

我使用以下方法发布到RMQ:对象池(Microsoft提供了一个名为Microsoft.Extensions.ObjectPool的包)- 在此处描述的方法 - https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/
我在这里使用以下代码:
class RabbitManager : IRabbitManager
{
    private readonly DefaultObjectPool<IModel> _objectPool;
    public RabbitManager(IPooledObjectPolicy<IModel> objectPolicy){
        _objectPool = new DefaultObjectPool<IModel>(objectPolicy, Environment.ProcessorCount * 2);
    }

    public void Publish<T>(T message, string exchangeName, string exchangeType, string routeKey) where T : class {
        if (message == null)
            return;

        var channel = _objectPool.Get();
        try{
            var sendBytes = Encoding.UTF8.GetBytes(message.ToString());
            var properties = channel.CreateBasicProperties();
            properties.ContentType = "application/json";
            properties.DeliveryMode = 1; // Doesn't persist to disk
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

            channel.BasicPublish(exchangeName, routeKey, properties, sendBytes);
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            _objectPool.Return(channel);
        }
    }
}

public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>
{
    private readonly RabbitOptions _options;
    private readonly IConnection _connection;

    public RabbitModelPooledObjectPolicy(RabbitOptions _options){
        this._options = _options;
        _connection = GetConnection();
    }

    private IConnection GetConnection() {
        var factory = new ConnectionFactory() {
            HostName = _options.HostName,
            UserName = _options.UserName,
            Password = _options.Password,
            //Port = _options.Port,
            VirtualHost = _options.VHost,
        };

        if (!String.IsNullOrEmpty(_options.CertPath))
        {
            factory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
            factory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
            factory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
            factory.Ssl.ServerName = _options.HostName;
            factory.Ssl.CertPath = _options.CertPath;
            factory.Ssl.CertPassphrase = _options.CertPass;
            factory.Ssl.Version = SslProtocols.Tls12;
            factory.Ssl.Enabled = true;
        }

        factory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
        factory.AutomaticRecoveryEnabled = true;        // enable automatic connection recovery
        factory.RequestedChannelMax = 32;

        var _connection = factory.CreateConnection();
        _connection.ConnectionShutdown += Connection_ConnectionShutdown;

        return _connection;
    }

    private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
        Utils.log.Info("Connection broke!");
    }

    private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){
        return true;
    }

    public IModel Create(){
        return _connection.CreateModel();
    }

    public bool Return(IModel obj) {
        if (obj.IsOpen) {
            return true;
        }
        else {
            obj?.Dispose();
            return false;
        }
    }
}

以下是一个问题的截图 - 常量内存增加:

enter image description here

这是上面截图后拍摄的内存快照的堆栈跟踪信息:

enter image description here

在上面的截图拍摄后不久,我在我的程序中的控制台中收到了以下错误消息:

26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
   at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
   at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
   at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
   at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348
26-04-2021 10:41:48 - OnNewData () RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Library, code=0, text='End of stream', classId=0, methodId=0, cause=System.IO.EndOfStreamException: Reached the end of the stream. Possible authentication failure.
   at RabbitMQ.Client.Impl.InboundFrame.ReadFrom(Stream reader, Byte[] frameHeaderBuffer)
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoopIteration()
   at RabbitMQ.Client.Framing.Impl.Connection.MainLoop()
   at RabbitMQ.Client.Framing.Impl.Connection.EnsureIsOpen()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.CreateModel()
   at RabbitModelPooledObjectPolicy.Create() in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitModelPooledObjectPolicy.cs:line 77
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Create()
   at Microsoft.Extensions.ObjectPool.DefaultObjectPool`1.Get()
   at RabbitManager.Publish[T](T message, String exchangeName, String exchangeType, String routeKey) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\RabbitMQ\RabbitManager.cs:line 32
   at ConsoleApp1.Service1.SendOrderToRMQ(JObject order) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 411
   at ConsoleApp1.Service1.ParseXMLAnswer(String strOutputXML, String caller) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 372
   at ConsoleApp1.Service1.OnNewData(String strXML) in C:\Users\user\Desktop\Projects\ConsoleCoreApp1\Service1.cs:line 348


这导致程序最终崩溃:

enter image description here

我为防止内存泄漏所做的事情:

  1. 启用了IDisposable接口的类
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
            {
                // free managed resources
                _onTimePerHour.Dispose();
                _api.OnNewData -= OnNewData;
            }
            // free native resources if there are any.
        }

强制在每次接收消息后进行垃圾回收:
private void ParseXMLAnswer(string strOutputXML, string caller) {
            ...
            doc = null;
            GC.Collect();
            GC.WaitForPendingFinalizers();
}

这有些有帮助,现在我遇到的内存问题在较长时间后会增加。
我使用 Visual Studio 的 ReShaper 插件(链接描述)来更好地理解内存问题的堆栈跟踪,但没有太大帮助。

enter image description here

我认为问题是什么

RabbitMQ生产者应用程序每秒接收到大量消息,然后将其解析、拆分成多个JSON消息,并使用同一通道发送到RMQ。可能会出现以下情况:

  • 我正在单个RMQ通道上发布,但某种方式应该使用多个通道(一个连接但多个通道)
  • 我接收的消息比我可以使用RabbitMQ.Client .net库解析和通过RMQ发送的消息多
  • 我在内存中保留了一些对象的引用(也许是消息),这些对象没有被释放;

有人之前遇到过这个问题吗?因为我无法在任何地方找到关于此“SingleProducerSingleConsumerQueue+Segment<Memory> out of memory”问题的信息。

有人知道如何更深入地分析这个问题吗?

非常感谢!


编辑1

我认为需要更多的信息来解决这个内存问题。

我有几个消费者从RabbitMQ消耗数据(例如NodeJS和Python应用程序)。因此,我需要以通用方式设计RabbitMQ生产者,因为每个消费者需要不同的数据。我不能每次有新的消费者应用程序时修改和重新启动我的RabbitMQ生产者。因此,我需要以通用方式发布我的消息。

例如,每个消费者都有自己专用的队列和专用绑定。假设我有一个名为consumer1的消费者,它具有队列cons1和绑定:

  • marketName.productName.*.1(productName将对应于天数)。

这个绑定是动态的,现在对应于星期一(4月4日),但明天将对应于星期二(4月5日)。

因此,我需要使用内存存储市场名称和产品名称。

private static read-only Dictionary<string, List<string>> sequenceItemsHashed = new Dictionary<string, List<string>>();
private static readonly Dictionary<string, string> sequencesFromInstruments = new Dictionary<string, string>();

我在逻辑上提到sequenceItemsHashed对应于marketNames,sequencesFromInstruments对应于productNames。
这样我把所有的消息都发送到RMQ,之后使用绑定在RMQ中进行排序。

编辑 2

据我所理解,为了解决我的问题,我需要类似以下架构的东西(输入链接描述):

enter image description here

因此,我的单一连接到RMQ服务器具有多个线程和每个线程一个通道。


编辑 3

已成功实现管道、ConcurrentQueue 和消费者线程,将推送到 RMQ,但仍存在内存问题:

private readonly TransformBlock<string, string> orderFilter;
private readonly TransformBlock<string, JObject> xmlParser;
//private readonly TransformBlock<XmlDocument, JObject> xmlToJsonTransformer;
private readonly TransformManyBlock<JObject, JToken> jsonOrderFactory;
private readonly ActionBlock<JToken> messageSender;

ConcurrentQueue<JToken> concurrentQueue = new ConcurrentQueue<JToken>();

public Service1 (string [] args) {
    ...
    // setup pipeline blocks
    orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
    xmlParser = new TransformBlock<string, JObject>(ParseXml);
    jsonOrderFactory = new TransformManyBlock<JObject, JToken>(CreateOrderMessages);
    messageSender = new ActionBlock<JToken>(SendMessage);

    // build your pipeline            
    orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
    orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs

    xmlParser.LinkTo(jsonOrderFactory);
    jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });

    Task t2 = Task.Factory.StartNew(() =>
            {
                while (true) { 
                    if (!concurrentQueue.IsEmpty)
                    {
                        JToken number;
                        while (concurrentQueue.TryDequeue(out number))
                        {
                            _rabbitMQ.PublishMessages(
                                Encoding.ASCII.GetBytes(number.ToString()),
                                "test"
                            );
                        }
                    } else
                    {
                        Thread.Sleep(1);
                    }
                }
            });        
     ...
}

private string FilterIncomingMessages(string strXml){
    if (strXml.Contains("<ORDER")) return strXml;
    return null;
}

private JObject ParseXml(string strXml){
    XmlDocument doc = new XmlDocument();
    doc.LoadXml(strXml);
    string jsonText = JsonConvert.SerializeXmlNode(doc);
    var o = JObject.Parse(jsonText);
    return o;
}

private IEnumerable<JToken> CreateOrderMessages(JObject o){
    List<JToken> myList = new List<JToken>();
            if (o.ContainsKey("GV8APIDATA")){
                if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Object){
                    JToken order = o["GV8APIDATA"]["ORDER"];
                    myList.Add(order);
                }
                else if (o["GV8APIDATA"]["ORDER"].Type is JTokenType.Array){
                    JToken orders = o["GV8APIDATA"]["ORDER"];
                    foreach (var order in orders.Children()){
                        myList.Add(order);
                    }
                }
            }
            return myList.ToArray ();
        }

private void SendMessage(JToken order){
    concurrentQueue.Enqueue(order);
}

新的解决方案有助于将逻辑分解为几个小部分,但我仍然存在恒定的内存增加。

enter image description here


编辑4

考虑到@Fildor的答案,我做了以下操作:

不再将包含<ORDER ...>元素的字符串转换为JSON,而是使用管道和以下代码将XML反序列化为对象。

我删除了线程和ConcurrentQueue部分,并直接在最后的ActionBlock中发布。

这解决了我的内存泄漏问题,但还存在其他问题:

  • 如果消息足够大,我只能每秒打印约120条消息。如果只打印简单的字符串“test”,则可以获得1780条消息/秒的速率。

enter image description here

public Service1 (string [] args) {
            ...             
            // setup pipeline blocks
            orderFilter = new TransformBlock<string, string>(FilterIncomingMessages);
            xmlParser = new TransformBlock<string, OrdersResponse>(ParseXml);
            jsonOrderFactory = new TransformManyBlock<OrdersResponse, Order>(CreateOrderMessages);
            messageSender = new ActionBlock<Order>(SendMessage);

            // build your pipeline            
            orderFilter.LinkTo(xmlParser, x => !string.IsNullOrEmpty(x));
            orderFilter.LinkTo(DataflowBlock.NullTarget<string>()); // for non-order msgs
            xmlParser.LinkTo(jsonOrderFactory);
            jsonOrderFactory.LinkTo(messageSender, new DataflowLinkOptions { PropagateCompletion = true });

            RunAsConsole(args);
        }

        private readonly TransformBlock<string, string> orderFilter;
        private readonly TransformBlock<string, OrdersResponse> xmlParser;
        private readonly TransformManyBlock<OrdersResponse, Order> jsonOrderFactory;
        private readonly ActionBlock<Order> messageSender;

        private void OnNewData(string strXML){
            orderFilter.Post(strXML); 
        }        

        private string FilterIncomingMessages(string strXml){
            if (strXml.Contains("<ORDER")) return strXml;
            return null;
        }

        private OrdersResponse ParseXml(string strXml) {
            var rootDataObj = DeserializeOrdersFromXML(strXml);
            return rootDataObj;
        }

        private OrdersResponse DeserializeOrdersFromXML(string strOutputXML){
            var xsExpirations = new XmlSerializer(typeof(OrdersResponse));
            OrdersResponse rootDataObj = null;
            using (TextReader reader = new StringReader(strOutputXML)) {
                rootDataObj = (OrdersResponse)xsExpirations.Deserialize(reader);
                reader.Close();
            }
            return rootDataObj;
        }

        private IEnumerable<Order> CreateOrderMessages(OrdersResponse o){
            return o.orders;
        }

        private void SendMessage(Order order) {
            _rabbitMQ.PublishMessages(
                    Encoding.ASCII.GetBytes(order.ToString()),
                    "test"
                );
        }

并且ORDER对象看起来像:

    [Serializable()]
    [XmlRoot (ElementName = "ORDER")]
    public class Order : IDisposable {

        public void Dispose()
        {
            EngineID = null;
            PersistentOrderID = null;
            ...
            InstrumentSpecifier.Dispose();
            InstrumentSpecifier = null;
            GC.SuppressFinalize(this);
        }

        [XmlAttribute (AttributeName = "EngineID")]
        public string EngineID { get; set; }
        [XmlAttribute (AttributeName = "PersistentOrderID")]
        public string PersistentOrderID { get; set; }
        ... 
        [XmlElement(ElementName = "INSTSPECIFIER")]
        public InstrumentSpecifier InstrumentSpecifier { get; set; }
    }

我的新的RabbitMQ类:

public class RMQ : IDisposable {
    private IConnection _connection;
    public IModel Channel { get; private set; }        
    private readonly ConnectionFactory _connectionFactory;
    private readonly string _exchangeName;

    public RMQ (RabbitOptions _rabbitOptions){
        try{
            // _connectionFactory initialization
            _connectionFactory = new ConnectionFactory()
            {
                HostName = _rabbitOptions.HostName,
                UserName = _rabbitOptions.UserName,
                Password = _rabbitOptions.Password,
                VirtualHost = _rabbitOptions.VHost,
            };
            this._exchangeName = _rabbitOptions.ExchangeName;

            if (!String.IsNullOrEmpty(_rabbitOptions.CertPath)){
                _connectionFactory.RequestedConnectionTimeout = TimeSpan.FromMilliseconds(5000);
                _connectionFactory.Ssl.AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch | SslPolicyErrors.RemoteCertificateChainErrors;
                _connectionFactory.Ssl.CertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateServerCertificate);
                _connectionFactory.Ssl.ServerName = _rabbitOptions.HostName;
                _connectionFactory.Ssl.CertPath = _rabbitOptions.CertPath;
                _connectionFactory.Ssl.CertPassphrase = _rabbitOptions.CertPass;
                _connectionFactory.Ssl.Version = SslProtocols.Tls12;
                _connectionFactory.Ssl.Enabled = true;
            }

            _connectionFactory.RequestedHeartbeat = TimeSpan.FromSeconds(1);
            _connectionFactory.AutomaticRecoveryEnabled = true;        // enable automatic connection recovery
            //_connectionFactory.RequestedChannelMax = 10;

            if (_connection == null || _connection.IsOpen == false){
                _connection = _connectionFactory.CreateConnection();
                _connection.ConnectionShutdown += Connection_ConnectionShutdown;
            }
            if (Channel == null || Channel.IsOpen == false){
                Channel = _connection.CreateModel();
            }
            Utils.log.Info("ConnectToRabbitMQ () Connecting to RabbitMQ. rabbitMQenvironment = ");
        }
        catch (Exception ex){
            Utils.log.Error("Connection to RabbitMQ failed ! HostName = " + _rabbitOptions.HostName + " VirtualHost = " + _rabbitOptions.VHost);
            Utils.printException("ConnectToRMQ ()", ex);
        }
    }        

    private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e){
        Utils.log.Info ("Connection broke!");
        try{
            if (ReconnectToRMQ()){
                Utils.log.Info("Connected!");
            }
        }
        catch (Exception ex){
            Utils.log.Info("Connect failed!" + ex.Message);
        }
    }

    private bool ReconnectToRMQ(){
        if (_connection == null || _connection.IsOpen == false){
            _connection = _connectionFactory.CreateConnection();
            _connection.ConnectionShutdown += Connection_ConnectionShutdown;                
        }

        if (Channel == null || Channel.IsOpen == false){
            Channel = _connection.CreateModel();
            return true;
        }
        return false;
    }

    private bool ValidateServerCertificate (object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) {
        return true;
    }

    public void DisconnectFromRMQ () {
        Channel.Close ();
        _connection.Close ();
    }     

    public void Dispose(){
        try{
            Channel?.Close();
            Channel?.Dispose();
            Channel = null;

            _connection?.Close();
            _connection?.Dispose();
            _connection = null;
        }
        catch (Exception e){
            Utils.log.Error("Cannot dispose RabbitMQ channel or connection" + e.Message);
        }
    }

    public void PublishMessages (byte [] message, string routingKey) {            
        if (this._connection == null || ! _connection.IsOpen) {
            Utils.log.Error ("PublishMessages(), Connect failed! this.conn == null || !conn.IsOpen ");
            ReconnectToRMQ();
        } else { 
            var properties = Channel.CreateBasicProperties();
            properties.Persistent = true;

            Channel.BasicPublish (_exchangeName, routingKey, properties, message);
            //serviceInstance1.Publish(message, _rabbitOptions.ExchangeName, "", routingKey);
        }
    }
}

现在有趣的是,如果我只向预定义队列发布一个小字符串(如“test”)到RabbitMQ,我可以每秒发布超过1780条消息。

2
“通过手动使用free()释放内存” - 实际上你并不是这样做的。你只是建议运行时执行垃圾回收。如果你发现自己认为需要这样做,那么你有一个不同的问题。在这里,似乎是分配和/或引用被持续保留了太长时间。所以,我会设置一个基准测试来获取实际的测量数据。然后逐个调整所有已识别的瓶颈,并比较基准测试。保留改进,放弃没有改进任何东西的更改。 - Fildor
1
SetUpApi最多被调用一次吗?如果不是,它会被调用多少次?如果你的内存非常受限,我也想知道为什么你要处理这么多字符串。你从一个字符串转换成一个(旧的)XmlDocument,只为了将其序列化为一个Json字符串,以便你可以从中创建一个JObject。我认为你可以通过仅转换为XDocument一次,找到order XElement节点,然后仅对该单个order节点调用JsonConvert.SerializeXNode来减少字符串大小。除此之外,根据Fidor的指示,你可能仍然需要将你的工作从事件线程卸载。 - rene
1
"每当来自提供API的公司的事件到来时,都会调用ParseAnswer()方法。确切地说,是在同一线程上逐个调用。看看这会导致什么?" - Fildor
1
不,不要启动一个线程/消息。只需将事件处理与处理分离。 - Fildor
1
@rene 你尝试逐个删除操作类型,将泄漏隔离到管道的某个阶段吗?此外,如上所述的管道架构可能会有所帮助(例如,使用内部并发队列和专用消费者线程进行转码步骤),另一个队列用于发送等。最后,您确定这是内存泄漏还是内存压力吗?这些是非常不同的。当应用程序恢复到稳定状态(一段时间没有消息)时会发生什么?内存是否被释放或仍然存在? - Kit
显示剩余15条评论
1个回答

2
首先,看起来您正在阻塞事件处理线程。 因此,我会将事件处理与实际处理分离:
(未经测试!只是概述!)
然后在serviceInstance1中,我会让Publish将订单排队到一个BlockingCollection中,在这个集合上有一个专用的线程在等待。该线程将执行实际发送。因此,无论您选择在Processor中执行什么操作,您都将将订单调度到该线程,并且所有内容都将解耦并按顺序进行。
您可能还想根据自己的要求设置BlockOptions。
请注意,这只是一个粗略的概述,而不是完整的解决方案。您可能还想从中出发,最小化字符串操作等。
编辑
一些我昨天想到的更多想法,没有特定的顺序: 参考: 针对问题中的第三个编辑进行回答:
Task t2 = Task.Factory.StartNew(() =>
            {
                while (true) { 
                    if (!concurrentQueue.IsEmpty)
                    {
                        JToken number;
                        while (concurrentQueue.TryDequeue(out number))
                        {
                            _rabbitMQ.PublishMessages(
                                Encoding.ASCII.GetBytes(number.ToString()),
                                "test"
                            );
                        }
                    } else
                    {
                        Thread.Sleep(1);
                    }
                }
            });  

“真的不是一个好主意。
首先,我会在处理发送的服务类中执行此操作(serviceInstance1-不知道类型)。然后,您正在使用自旋等待进行紧密循环,同时将TPL与Thread.Sleep混合使用。这是两个不行的做法。它还完全破坏了阻塞队列的意图。即:线程会一直阻塞,直到该队列上有可用项。
也许现在最好的想法是完全放弃这部分,并仅让管道中的最终块执行serviceInstance1.Publish。这可能是过早的优化。”
“编辑2
所以,昨天我做了一些实验,并发现这个:”
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml.Linq;
using System.Xml.Serialization;
using Newtonsoft.Json;
using System.Linq;

namespace DataFlowExperiment.PipelinesLib
{

    public class PipelineOne
    {
        private readonly IPipelineOneSteps steps;

        private readonly TransformBlock<string, XDocument> startBlock; // XML deserialize to Model
        private readonly TransformManyBlock<XDocument, string> toJsonMessagesBlock; // jsons generieren.
        private readonly ITargetBlock<string> resultCallback;

        public PipelineOne(IPipelineOneSteps steps, ITargetBlock<string> resultCallback = null)
        {
            this.steps = steps;

            startBlock = new TransformBlock<string, XDocument>(steps.Start);
            toJsonMessagesBlock = new TransformManyBlock<XDocument, string>(steps.ToJson);

            this.resultCallback = resultCallback ?? DataflowBlock.NullTarget<string>();

            startBlock.LinkTo(toJsonMessagesBlock, new DataflowLinkOptions { PropagateCompletion = true });
            toJsonMessagesBlock.LinkTo(this.resultCallback, new DataflowLinkOptions { PropagateCompletion = true }, x => !string.IsNullOrEmpty(x));
            toJsonMessagesBlock.LinkTo(DataflowBlock.NullTarget<string>(), new DataflowLinkOptions { PropagateCompletion = true });
        }

        public void Post(string input)
        {
            startBlock.Post(input);
        }

        public Task Close()
        {
            startBlock.Complete();
            return resultCallback.Completion;
        }
    }

    public interface IPipelineOneSteps
    {
        public XDocument Start(string input);
        public IEnumerable<string> ToJson(XDocument doc);
    }

    public class PipelineOneSteps : IPipelineOneSteps
    {
        private readonly JsonSerializer jsonSerializer;

        public PipelineOneSteps()
        {
            jsonSerializer = JsonSerializer.CreateDefault();
        }

        public XDocument Start(string input)
        {
            XDocument doc = XDocument.Parse(input);
            return doc;
        }

        public IEnumerable<string> ToJson(XDocument doc)
        {
            XNamespace ns = "api-com";
            var orders = doc.Root.Elements(ns + "ORDER");

            foreach (var order in orders)
            {
                yield return JsonConvert.SerializeXNode(order);
            }
        }
    }
}

这个基准测试的结果:

BenchmarkDotNet=v0.12.1, OS=Windows 10.0.19041.867 (2004/?/20H1)
Intel Core i9-10885H CPU 2.40GHz, 1 CPU, 16 logical and 8 physical cores
.NET Core SDK=5.0.202
  [Host]     : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT
  DefaultJob : .NET Core 3.1.14 (CoreCLR 4.700.21.16201, CoreFX 4.700.21.16208), X64 RyuJIT



方法 N 平均值 误差 标准偏差 Gen 0 Gen 1 Gen 2 分配的内存
PipeLineOneBenchmark 1000 25.00 μs 0.269 μs 0.252 μs - - - -
PipeLineOneBenchmark 100000 2,491.42 μs 13.655 μs 15.177 μs - - - -
这段文本的翻译如下:

这与您的解决方案类似,但也有不同之处。

然而,这让我想到,实际问题可能在其他地方。(仍在努力解决并将进行更新。)

我在考虑一个小工具,用于检查您的兔子是否太慢,以及是否存在重大积压:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace DataFlowExperiment.PipelinesLib
{
    public class RabbitWrapper : IDisposable
    {
        private readonly int batchSize = 10;

        private Thread senderThread;
        private readonly BlockingCollection<string> messages;
        private readonly ActionBlock<string> receiver;
        private readonly CancellationTokenSource stoppingToken;
        private readonly RabbitWrapperStats stats;

        private ITargetBlock<string> Receiver => receiver;

        public RabbitWrapper()
        {
                                                                // Drop in your logging here
            stats = new RabbitWrapperStats(new Progress<string>(x => Console.WriteLine(x)));
            stoppingToken = new CancellationTokenSource();
            messages = new BlockingCollection<string>();
            receiver = new ActionBlock<string>(Receive);
            senderThread = new Thread(HandleQueue);
            senderThread.Start();
        }

        private void Receive(string message)
        {
            messages.Add(message);
        }

        private void HandleQueue()
        {
            while (!stoppingToken.Token.IsCancellationRequested)
            {
                int batchIndex = 0;
                do {
                    string message = messages.Take(stoppingToken.Token);
                    if (!string.IsNullOrEmpty(message))
                    {
                        SendToRabbit(message);
                    }
                    batchIndex++;
                } while (!stoppingToken.Token.IsCancellationRequested &&
                         batchIndex < batchSize &&
                         messages.Count > 0);
                // Check statistics every 10 messages.
                CheckStats(messages.Count);
            }
        }

        private void SendToRabbit(string message)
        {
            // rabbit Publish goes here.
        }

        private void CheckStats(int count)
        {
            stats.CheckStats(count);
        }

        public void Close()
        {
            this.stoppingToken.Cancel();
            senderThread.Join();
        }

        public void Dispose()
        {
            Close();
        }
    }

    internal class RabbitWrapperStats
    {
        // You may want to play around with these thresholds
        // I pulled them out of thin air ...
        const int SIZE_WARN = 500000;
        const int SIZE_CRITICAL = SIZE_WARN * 2;

        private int lastTenIndex = 0;
        private int[] lastTen = new int[10] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
        private int meanSizeLastTen = 0;
        private int lastMeanSize = 0;
        private int tendency = 0;

        private bool HasWarned = false;
        private bool HasPanicked = false;

        private readonly IProgress<string> progress;

        public RabbitWrapperStats(IProgress<string> progress)
        {
            this.progress = progress;
        }

        public void CheckStats(int queueSize)
        {
            UpdateLastTen(queueSize);

            if (!HasPanicked && queueSize > SIZE_CRITICAL)
            {
                Panic(queueSize);
                return;
            }

            if (!HasWarned && queueSize > SIZE_WARN)
            {
                Warn(queueSize);
                return;
            }

            if ((HasPanicked || HasWarned ) && meanSizeLastTen < SIZE_WARN * 0.75)
            {
                HasPanicked = false;
                HasWarned = false;
                progress?.Report($"INFO Mean size of last 10 Samples sinks below {SIZE_WARN * 0.75} : {meanSizeLastTen}");
            }
        }

        private void Warn(int size)
        {
            HasWarned = true;
            progress?.Report($"WARNING QueueSize = {size}");
        }

        private void Panic(int size)
        {
            HasPanicked = true;
            progress?.Report($"!! CRITICAL !! QueueSize = {size}");
        }

        private void UpdateLastTen(int value)
        {
            lastTen[lastTenIndex] = value;
            lastTenIndex = ++lastTenIndex % lastTen.Length;
            meanSizeLastTen = lastTen.Sum() / lastTen.Length;
            tendency = meanSizeLastTen.CompareTo(lastMeanSize);
            lastMeanSize = meanSizeLastTen;
        }
    }
}

@R13mus 这是我之前所说内容的实现。每个块(阶段,或者您想怎么称呼它)只做需要完成的事情,并尽可能快地完成,这样可以增加并行性。这似乎与直觉相反,但通常可以减少内存压力,因为资源不是共享的,并且可以被迅速回收。同时,分别考虑每个块也更容易理解,如果您真的在某个地方泄漏了内存,这可能会导致“恍然大悟”的时刻。 - Kit
所以,这个不起作用 - jsonOrderFactory = new TransformManyBlock<JObject, JObject>((Func<JObject, JObject>)CreateOrderMessages); 因为我得到了以下异常:参数1:无法将“System.Func<Newtonsoft.Json.Linq.JObject,Newtonsoft.Json.Linq.JObject>”转换为“System.Func<Newtonsoft.Json.Linq.JObject,System.Collections.Generic.IEnumerable<Newtonsoft.Json.Linq.JObject>>”。 - R13mus
我有一些更新。第一个是我将XML解析和转换为JSON,并使用System.Xml.XmlSerializer将XML序列化为对象。XmlDocument的问题在于它会将所有对象加载到内存中,这会导致大问题(我将创建更新4来发布我的新代码)。第二个是如果我像普通字符串一样将某些内容打印到RMQ“test”,我会得到很高的发布速率(如1750m/s)。如果我发布整个订单,它只会以120 m/s的速度发布。有一个问题不太清楚 - 我什么时候需要调用startBlock.Complete()?谢谢! - R13mus
1
所以我不需要调用Complete(),因为我希望我的应用程序能够运行数天、数周、数月,如果可能的话。或者当我处理该服务时。我已经在EDIT 4中提供了我的答案。我需要尝试一下你的EDIT 2解决方案 :) 非常感谢您的耐心和帮助! - R13mus
1
终于在你的帮助下解决了问题!管道非常有用,再加上我切换回使用System.Text.Json而不是Newtonsoft.Json。我还缩短了消息长度并进行了更多过滤,现在我有了一个可行的解决方案。干杯!! - R13mus
显示剩余15条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接