高CPU Azure Worker角色

7

这是一个相当广泛的问题,但我已经没有想法了。我们目前运行两个工作角色实例,执行以下操作:

  • 通过为每个批次生成N个线程来监视和处理IoT Hub事件。
  • 监视和处理来自IoT Hub的连接/断开(操作监视)消息
  • 执行一些服务总线工作(主题和队列)
  • 通过NLOG将日志写入SQL、DocDB(Mongo API)和Azure表存储
  • 通过IoT Hub发送云到设备消息

我们面临的问题是在高峰期间,我们的CPU显然会增加,但很遗憾它从未下降过,并且经常会飙升到100%并坐在那里,直到我重新启动实例才能将其降下来。虽然无法确定原因,但我仍在查看线程。现在让我们进入代码...

WorkerRole.cs中:

    class WorkerRole : RoleEntryPoint
    {
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

        public override void Run()
        {
            _eventprocessor.Start(instanceId, instanceIndex);//.Wait(-1);

            //Wait for shutdown to be called, else the role will recycle
            this.runCompleteEvent.WaitOne();
        }
    }

EventProcessor.cs 文件中: 我会尽量省略冗长的内容,但加入我认为有价值的部分。尽可能添加"伪代码"。
public class EventProcessor : IEventProcessor
{
  private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

  public async Task Start(string serviceId, int InstanceIndex)
  {

    //Setup Topic

    //Setup Queue

    //Setup EventProcessorHost for receiving events and operations monitoring and start listening

    //Receiving cloud to device feedback from service
    ReceiveFeedbackAsync();

    runCompleteEvent.WaitOne();
  }

  async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
  {
        if (messages.Count() > 0)
        {
            if (!_cancellationSource.IsCancellationRequested)
            {
                await ProcessEventsBulk(context, messages);
            }
        }

        if (messages.Count() > 0)
        {
            await context.CheckpointAsync();               
        }
   }

  async Task ProcessEventsBulk(PartitionContext context, IEnumerable<EventData> messages)
        {
            List<Task> TaskList = new List<Task>();
            foreach (EventData message in messages)
            {
                var LastTask = Task.Run(() => GoBoy(context, message));
                TaskList.Add(LastTask);
            }
            await Task.WhenAll(TaskList);
        }

    async Task GoBoy(PartitionContext context, EventData message)
    {
        try
        {
            using (var db = new AppDbContext(_dbContextConnectionString))
            {
                await ProcessEvent(message, context.Lease.PartitionId, new CoreManagerContainer(db), db);
                await db.SaveChangesAsync();
            }
        }
        catch (Exception e)
        {
           //Do Some stuff...
        }
    }

  private async void ReceiveFeedbackAsync()
    {
        var feedbackReceiver = serviceClientReceiver.GetFeedbackReceiver();
        while (true)
        {
            try
            {
              var feedbackBatch = await feedbackReceiver.ReceiveAsync();
              if (feedbackBatch == null) continue;
              foreach (var records in feedbackBatch.Records)
              {

              }
              await feedbackReceiver.CompleteAsync(feedbackBatch);
            }
            catch (Exception)
            {
              Thread.Sleep(30000);                    
            }
         }

    }

}

如果有任何额外需要,请不要犹豫,尽管问。我非常感谢任何帮助。
重新启动工作程序后,这里显示了CPU下降enter image description here
微软支持协助我进行一些PerfViews和ProcDumps。结果是我们应该查看调用我们的中心“https://abcxyz.azure-devices.net:443/$iothub/websocket”的线程。这就是为什么我决定添加ReceiveFeedbackAsync()方法,因为我知道它依赖于与我们的中心保持永久连接以收集反馈。
从我所看到的,我们正确地注册了我们的EVPH,但如果有人想查看那段代码,请告诉我。

2
如果你的 feedbackReceiver 由于某些条件而不断返回 null,则你会得到一个完美的“while(true);”循环。 - Ton Plooij
@TonPlooij 谢谢回复,我确实考虑过这个,只是它是推荐的解决方案。请参阅 https://learn.microsoft.com/en-us/azure/iot-hub/iot-hub-csharp-csharp-c2d 的“接收传递反馈”部分。 - David
1
一些想法:无检查点,例如每1000条消息或基于定时器间隔,比如一分钟。摆脱ORM并使用纯ado.net。根据接收到的消息数量,这可能会有所帮助。特别是ORM可能会因其转换为对象而影响性能。 - Peter Bons
@ObiEff 谢谢,是的,我也已经做出了那个更改。目前为止一切顺利!还有48小时来确认。令人兴奋! - David
在你的 while(true) 循环中,尝试添加 Thread.SpinWait(100)。 - King Jk
显示剩余5条评论
1个回答

0
你是否已经逐步执行代码并确保没有创建无限循环条件,而这些条件不会抛出任何异常,以便让你的Thead.Sleep得以执行。由于你的代码中期望Sleep,最好避免使用异常来触发它。也许可以在处理每批反馈后编写代码来进行Sleep。异常用于错误处理和特殊情况,而不是帮助控制逻辑流程。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接