多线程的度量指标

3

这似乎是一个非常普遍的使用案例,也许我想多了,但我在处理来自多个线程的集中度量时遇到了问题。假设我有多个工作线程正在处理记录,并且每处理1000个记录我都想输出一些指标。现在,我可以让每个线程记录单独的指标,但是要得到吞吐量数字,我必须手动将它们加起来(当然时间边界不会完全相同)。以下是一个简单的例子:

public class Worker implements Runnable {

   private static int count = 0;
   private static long processingTime = 0;

   public void run() {
       while (true) {
          ...get record
          count++;
          long start = System.currentTimeMillis();
          ...do work
          long end = System.currentTimeMillis();
          processingTime += (end-start);
          if (count % 1000 == 0) {
              ... log some metrics
              processingTime = 0;
              count = 0;
          }
       }
    }
}

希望这有些意义。我知道这两个静态变量可能是AtomicInteger和AtomicLong,但也可能不是。我对人们的想法很感兴趣。我曾考虑过使用原子变量并使用ReentrantReadWriteLock,但我真的不希望指标停止处理流程(即指标对处理的影响应该非常小)。谢谢。

3个回答

4

将实际处理工作转移到另一个线程可能是个好主意。这个想法是尽快封装您的数据并将其移交给一个处理线程,以便最小化对正在执行有意义工作的线程的影响。

虽然存在一些移交争用,但通常这种成本远小于任何其他类型的同步,因此在许多情况下应该是一个很好的选择。我认为M. Jessup的解决方案与我的非常接近,但希望以下代码能够清晰地说明这一点。

public class Worker implements Runnable {

   private static final Metrics metrics = new Metrics();

   public void run() {
      while (true) {
        ...get record
        long start = System.currentTimeMillis();
        ...do work
        long end = System.currentTimeMillis();
        // process the metric asynchronously
        metrics.addMetric(end - start);
     }
  }

  private static final class Metrics {
     // a single "background" thread that actually handles
     // processing
     private final ExecutorService metricThread = 
           Executors.newSingleThreadExecutor();
     // data (no synchronization needed)
     private int count = 0;
     private long processingTime = 0;

     public void addMetric(final long time) {
        metricThread.execute(new Runnable() {
           public void run() {
              count++;
              processingTime += time;
              if (count % 1000 == 0) {
                 ... log some metrics
                 processingTime = 0;
                 count = 0;
              }
           }
        });
      }
   }
}

+1 但我总是对这样的解决方案感到疑惑。因为你实际上实现了你想要的吗?也就是说,从操作系统中进行线程抢占以允许此日志记录的成本是否会减少其他工作线程实现的指标(仅当number_of_working_threads > cores_on_machine时才是一个有效的问题)。与其在已经获得指标的同一线程上执行,不如在另一个线程上执行。 - John Vint
真的,在特定条件下这是最好的。我会稍微修改你的条件为number_of_CPU_busy_threads > cores_on_machine。实际上,很多线程在大部分处理时间处于空闲状态(被I/O阻塞等)。在大多数情况下,真正活跃的线程数量不会超过核心数量,否则你实际上已经超出了CPU容量。如果操作的性质是“串行化”的(例如,记录到文件),这种模式也很有效。 - sjlee
我真的很喜欢这个答案 - 一开始无法确定它会如何影响处理时间,但我的猜测是指标线程将保持相当繁忙,而不会实际影响运行记录处理器。 - Gandalf

2
我建议,如果您不希望日志记录干扰处理过程,您应该有一个单独的日志工作线程,并让您的处理线程仅提供某种类型的值对象,以便可以移交。在这个例子中,我选择了一个LinkedBlockingQueue,因为它具有使用offer()阻止微不足道的时间的能力,并且您可以将阻塞延迟到从队列中提取值的另一个线程中。根据您的要求,MetricProcessor可能需要增加数据排序等更多逻辑,但即使它是一个长时间运行的操作,它也不会阻止VM线程调度程序在此期间重新启动真正的处理线程。
public class Worker implements Runnable {

  public void run() {
    while (true) {
      ... do some stuff
      if (count % 1000 == 0) {
        ... log some metrics
        if(MetricProcessor.getInstance().addMetrics(
            new Metrics(processingTime, count, ...)) {
          processingTime = 0;
          count = 0;
        } else {
          //the call would have blocked for a more significant
          //amount of time, here the results
          //could be abandoned or just held and attempted again
          //as a larger data set later
        }
      }
    }
  }
}

public class WorkerMetrics {
  ...some interesting data
  public WorkerMetrics(... data){
    ...
  }
  ...getter setters etc
}

public class MetricProcessor implements Runnable {
  LinkedBlockingQueue metrics = new LinkedBlockingQueue();
  public boolean addMetrics(WorkerMetrics m) {
    return metrics.offer(m); //This may block, but not for a significant amount of time.
  }

  public void run() {
    while(true) {
      WorkMetrics m = metrics.take(); //wait here for something to come in
      //the above call does all the significant blocking without
      //interrupting the real processing
      ...do some actual logging, aggregation, etc of the metrics
    }
  }
}

LinkedBlockingQueue 绝对会在 add 操作上阻塞。你是不是想使用 ConcurrentLinkedQueue?drainTo 操作也会阻塞。添加线程的情况规避了原始问题,无论你做什么,多线程都会有某种形式的串行化。在我看来,增加另一个线程没有用处。 - John Vint
是的,它会在offer上阻塞,但与进行完整度量记录可能阻塞的任何时间相比,它阻塞的时间微不足道。 OP发表了“但我真的不希望指标停止处理流”的声明,因此在这里我们委托以允许处理流程继续。当然,drainTo会阻塞,但如果它在度量处理中,那么这并不重要,因为它与主处理分开。如果您没有另一个线程,如何在不长时间阻塞一个或多个处理线程的情况下进行序列化? - M. Jessup
如果有大量的工作要为记录指标而完成,你可以提出使用另一个线程。但是,你应该更新你的回答并删除那些声称add、offer和take不会阻塞的语句。这并没有帮助OP,因为这是不准确的。 - John Vint
收到意见,回答已更新,以更清晰地表明提供可能会短暂地被阻塞。 - M. Jessup
基本上我认为这与顶部答案相同 - 在后台,Executor服务使用类似于此的BlockingQueue - 但在幕后隐藏了一些复杂性。一个好答案,但我认为另一个略微更优雅。 - Gandalf

1
如果您依赖计数和处理时间的状态同步,那么您必须使用锁。例如,如果当++count%1000 == 0为真时,您想在那个时间评估处理时间的指标。
对于这种情况,使用ReentrantLock是有意义的。我不会使用RRWL,因为没有纯读取发生的实例。它始终是一个读/写集。但您需要在所有内容周围进行锁定。
  count++
  processingTime += (end-start);
  if (count % 1000 == 0) {
      ... log some metrics
      processingTime = 0;
      count = 0;
  }

无论 count++ 是否在该位置,您也需要在其周围加锁。最后,如果您正在使用 Lock,则不需要 AtomicLong 和 AtomicInteger。这只会增加开销并且不会更安全。

我相信我在实际问题中已经说过了。 - Gandalf
当你写下“但也许不是”时,我认为你的意思是你会承受并发性能损失,而不使用AtomicLong :) - John Vint

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接