Python实时图像分类问题与神经网络

30
我正在尝试使用caffe和Python进行实时图像分类。我在一个进程中使用OpenCV从我的网络摄像头进行视频流,并在另一个进程中使用caffe对从网络摄像头获取的帧进行图像分类。然后,我将分类结果传回主线程以说明网络摄像头流。问题是,即使我拥有NVIDIA GPU并在GPU上执行caffe预测,主线程也会变慢。通常情况下,在不进行任何预测的情况下,我的网络摄像头流以30 fps运行;但是,使用预测后,最佳情况下我的网络摄像头流只有15 fps。我已确认caffe确实在进行预测时使用了GPU,并且我的GPU或GPU内存没有达到最大值。我还验证了我的CPU核心在程序执行期间没有达到最大值。我想知道我是否做错了什么,或者是否无法将这两个进程真正分离。欢迎任何建议。以下是我的代码供参考。
class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #other initialization stuff

    def run(self):
        caffe.set_mode_gpu()
        caffe.set_device(0)
        #Load caffe net -- code omitted 
        while True:
            image = self.task_queue.get()
            #crop image -- code omitted
            text = net.predict(image)
            self.result_queue.put(text)

        return

import cv2
import caffe
import multiprocessing
import Queue 

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
else:
    rval = False
frame_copy[:] = frame
task_empty = True
while rval:
    if task_empty:
       tasks.put(frame_copy)
       task_empty = False
    if not results.empty():
       text = results.get()
       #Add text to frame
       cv2.putText(frame,text)
       task_empty = True

    #Showing the frame with all the applied modifications
    cv2.imshow("preview", frame)

    #Getting next frame from camera
    rval, frame = vc.read()
    frame_copy[:] = frame
    #Getting keyboard input 
    key = cv2.waitKey(1)
    #exit on ESC
    if key == 27:
        break

我相信问题出在caffe预测上,因为当我注释掉预测部分,并让进程之间传递虚假文本时,帧率再次达到了30fps。

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        #other initialization stuff

    def run(self):
        caffe.set_mode_gpu()
        caffe.set_device(0)
        #Load caffe net -- code omitted
        while True:
            image = self.task_queue.get()
            #crop image -- code omitted
            #text = net.predict(image)
            text = "dummy text"
            self.result_queue.put(text)

        return

import cv2
import caffe
import multiprocessing
import Queue 

tasks = multiprocessing.Queue()
results = multiprocessing.Queue()
consumer = Consumer(tasks,results)
consumer.start()

#Creating window and starting video capturer from camera
cv2.namedWindow("preview")
vc = cv2.VideoCapture(0)
#Try to get the first frame
if vc.isOpened():
    rval, frame = vc.read()
else:
    rval = False
frame_copy[:] = frame
task_empty = True
while rval:
    if task_empty:
       tasks.put(frame_copy)
       task_empty = False
    if not results.empty():
       text = results.get()
       #Add text to frame
       cv2.putText(frame,text)
       task_empty = True

    #Showing the frame with all the applied modifications
    cv2.imshow("preview", frame)

    #Getting next frame from camera
    rval, frame = vc.read()
    frame_copy[:] = frame
    #Getting keyboard input 
    key = cv2.waitKey(1)
    #exit on ESC
    if key == 27:
        break

但是预测应该在GPU上进行,对吧?那么为什么增加CPU使用率会在这种情况下有帮助呢?有点困惑。 - user3543300
我已经使用time.sleep(1)进行了测试,并没有在我的程序中遇到减速的情况。我已经以CPU_ONLY模式运行了caffe,并且注意到了更严重的减速。尽管单帧预测为什么会如此大程度地压力CPU我还不确定。 - user3543300
2
我曾经使用cuda-convnet进行非实时视频分析,CPU和GPU的负载都还不错。但是我没有分析过CPU的使用情况,也不知道哪部分是我的代码,哪部分是cuda-convnet的代码。我使用了批处理,单帧可能会导致更多的CPU开销,但是这只是我的直觉,也许是错误的。 :) - Ulrich Stern
我可能很天真,但你用的是哪个GPU?拥有GPU并不能保证快速预测,事实上,一块糟糕的GPU可能比CPU更慢。如果你想要实时预测,你需要一块相当不错的GPU(例如TitanX)。你能否测试一下net.predict(image)需要多长时间? - Imanol Luengo
你有计时过 cv2.waitKey(1) 吗?当你的代码只能达到15帧每秒时,这个调用中会发生一些“魔法”(事件处理)(参见文档),我曾经遇到过奇怪的交互。如果不是这个问题,你可以计时循环的其他部分(例如vc.read())来缩小可能导致15fps减速的语句范围。 - Ulrich Stern
显示剩余13条评论
4个回答

4
一些解释和重新思考:
我在一台配备英特尔酷睿i5-6300HQ @2.3GHz处理器,8 GB内存和NVIDIA GeForce GTX 960M GPU(2GB内存)的笔记本电脑上运行了以下代码,结果如下: 无论我是否运行了caffe(通过注释或不注释void Consumer::entry()中的net_output = this->net_->Forward(net_input)和其他必要内容),我总能在主线程中获得约30 fps。 类似的结果也在一台配备英特尔酷睿i5-4440处理器、8 GB内存和NVIDIA GeForce GT 630 GPU(1GB内存)的PC上得到了。 我在同样的笔记本电脑上运行了@user3543300问题中的代码,结果如下: 无论caffe是否在gpu上运行,我也可以获得约30 fps。 根据@user3543300的反馈,在运行caffe(在Nvidia GeForce 940MX GPU和Intel® Core™ i7-6500U CPU @ 2.50GHz × 4的笔记本电脑上)时,使用上述两个版本的代码只能获得约15 fps。当作为独立程序在gpu上运行caffe时,网络摄像头的帧率也会减慢。
所以我仍然认为问题可能主要在于硬件I / O限制,例如DMA带宽(关于 DMA的这个线程可能会提示)或RAM带宽。希望@user3543300可以检查此问题或找出我没有意识到的真正问题。
如果问题确实是我上面所想的那样,那么一个明智的想法就是减少CNN网络引入的内存I/O开销。实际上,为了解决嵌入式系统中硬件资源有限的类似问题,已经有一些关于这个主题的研究,例如量化 结构稀疏深度神经网络SqueezeNet深度压缩。因此,通过应用这些技巧,希望能够提高问题中网络摄像头的帧率。
尝试这个 C++ 解决方案。它使用线程来处理你的任务中的 I/O 开销,我使用 bvlc_alexnet.caffemodel 和 deploy.prototxt 进行图像分类测试,并没有看到主线程(网络摄像头流)明显变慢,当 Caffe 在 GPU 上运行时。
#include <stdio.h>
#include <iostream>
#include <string>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "caffe/caffe.hpp"
#include "caffe/util/blocking_queue.hpp"
#include "caffe/data_transformer.hpp"
#include "opencv2/opencv.hpp"

using namespace cv;

//Queue pair for sharing image/results between webcam and caffe threads
template<typename T>
class QueuePair {
  public:
    explicit QueuePair(int size);
    ~QueuePair();

    caffe::BlockingQueue<T*> free_;
    caffe::BlockingQueue<T*> full_;

  DISABLE_COPY_AND_ASSIGN(QueuePair);
};
template<typename T>
QueuePair<T>::QueuePair(int size) {
  // Initialize the free queue
  for (int i = 0; i < size; ++i) {
    free_.push(new T);
  }
}
template<typename T>
QueuePair<T>::~QueuePair(){
  T *data;
  while (free_.try_pop(&data)){
    delete data;
  }
  while (full_.try_pop(&data)){
    delete data;
  }
}
template class QueuePair<Mat>;
template class QueuePair<std::string>;

//Do image classification(caffe predict) using a subthread
class Consumer{
  public:
    Consumer(boost::shared_ptr<QueuePair<Mat>> task
           , boost::shared_ptr<QueuePair<std::string>> result);
    ~Consumer();
    void Run();
    void Stop();
    void entry(boost::shared_ptr<QueuePair<Mat>> task
             , boost::shared_ptr<QueuePair<std::string>> result);

  private:
    bool must_stop();

    boost::shared_ptr<QueuePair<Mat> > task_q_;
    boost::shared_ptr<QueuePair<std::string> > result_q_;

    //caffe::Blob<float> *net_input_blob_;
    boost::shared_ptr<caffe::DataTransformer<float> > data_transformer_;
    boost::shared_ptr<caffe::Net<float> > net_;
    std::vector<std::string> synset_words_;
    boost::shared_ptr<boost::thread> thread_;
};
Consumer::Consumer(boost::shared_ptr<QueuePair<Mat>> task
                 , boost::shared_ptr<QueuePair<std::string>> result) :
 task_q_(task), result_q_(result), thread_(){

  //for data preprocess
  caffe::TransformationParameter trans_para;
  //set mean
  trans_para.set_mean_file("/path/to/imagenet_mean.binaryproto");
  //set crop size, here is cropping 227x227 from 256x256
  trans_para.set_crop_size(227);
  //instantiate a DataTransformer using trans_para for image preprocess
  data_transformer_.reset(new caffe::DataTransformer<float>(trans_para
                        , caffe::TEST));

  //initialize a caffe net
  net_.reset(new caffe::Net<float>(std::string("/path/to/deploy.prototxt")
           , caffe::TEST));
  //net parameter
  net_->CopyTrainedLayersFrom(std::string("/path/to/bvlc_alexnet.caffemodel"));

  std::fstream synset_word("path/to/caffe/data/ilsvrc12/synset_words.txt");
  std::string line;
  if (!synset_word.good()){
    std::cerr << "synset words open failed!" << std::endl;
  }
  while (std::getline(synset_word, line)){
    synset_words_.push_back(line.substr(line.find_first_of(' '), line.length()));
  }
  //a container for net input, holds data converted from cv::Mat
  //net_input_blob_ = new caffe::Blob<float>(1, 3, 227, 227);
}
Consumer::~Consumer(){
  Stop();
  //delete net_input_blob_;
}
void Consumer::entry(boost::shared_ptr<QueuePair<Mat>> task
    , boost::shared_ptr<QueuePair<std::string>> result){

  caffe::Caffe::set_mode(caffe::Caffe::GPU);
  caffe::Caffe::SetDevice(0);

  cv::Mat *frame;
  cv::Mat resized_image(256, 256, CV_8UC3);
  cv::Size re_size(resized_image.cols, resized_image.rows);

  //for caffe input and output
  const std::vector<caffe::Blob<float> *> net_input = this->net_->input_blobs();
  std::vector<caffe::Blob<float> *> net_output;

  //net_input.push_back(net_input_blob_);
  std::string *res;

  int pre_num = 1;
  while (!must_stop()){
    std::stringstream result_strm;
    frame = task->full_.pop();
    cv::resize(*frame, resized_image, re_size, 0, 0, CV_INTER_LINEAR);
    this->data_transformer_->Transform(resized_image, *net_input[0]);
    net_output = this->net_->Forward();
    task->free_.push(frame);

    res = result->free_.pop();
    //Process results here
    for (int i = 0; i < pre_num; ++i){
      result_strm << synset_words_[net_output[0]->cpu_data()[i]] << " " 
                  << net_output[0]->cpu_data()[i + pre_num] << "\n";
    }
    *res = result_strm.str();
    result->full_.push(res);
  }
}

void Consumer::Run(){
  if (!thread_){
    try{
      thread_.reset(new boost::thread(&Consumer::entry, this, task_q_, result_q_));
    }
    catch (std::exception& e) {
      std::cerr << "Thread exception: " << e.what() << std::endl;
    }
  }
  else
    std::cout << "Consumer thread may have been running!" << std::endl;
};
void Consumer::Stop(){
  if (thread_ && thread_->joinable()){
    thread_->interrupt();
    try {
      thread_->join();
    }
    catch (boost::thread_interrupted&) {
    }
    catch (std::exception& e) {
      std::cerr << "Thread exception: " << e.what() << std::endl;
    }
  }
}
bool Consumer::must_stop(){
  return thread_ && thread_->interruption_requested();
}


int main(void)
{
  int max_queue_size = 1000;
  boost::shared_ptr<QueuePair<Mat>> tasks(new QueuePair<Mat>(max_queue_size));
  boost::shared_ptr<QueuePair<std::string>> results(new QueuePair<std::string>(max_queue_size));

  char str[100], info_str[100] = " results: ";
  VideoCapture vc(0);
  if (!vc.isOpened())
    return -1;

  Consumer consumer(tasks, results);
  consumer.Run();

  Mat frame, *frame_copy;
  namedWindow("preview");
  double t, fps;

  while (true){
    t = (double)getTickCount();
    vc.read(frame);

    if (waitKey(1) >= 0){
      consuer.Stop();
      break;
    }

    if (tasks->free_.try_peek(&frame_copy)){
      frame_copy = tasks->free_.pop();
      *frame_copy = frame.clone();
      tasks->full_.push(frame_copy);
    }
    std::string *res;
    std::string frame_info("");
    if (results->full_.try_peek(&res)){
      res = results->full_.pop();
      frame_info = frame_info + info_str;
      frame_info = frame_info + *res;
      results->free_.push(res);
    }    

    t = ((double)getTickCount() - t) / getTickFrequency();
    fps = 1.0 / t;

    sprintf(str, " fps: %.2f", fps);
    frame_info = frame_info + str;

    putText(frame, frame_info, Point(5, 20)
         , FONT_HERSHEY_SIMPLEX, 0.5, Scalar(0, 255, 0));
    imshow("preview", frame);
  }
}

src/caffe/util/blocking_queue.cpp中,做出以下小改动并重新构建caffe:
...//Other stuff
template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
template class BlockingQueue<Datum*>;
template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
template class BlockingQueue<P2PSync<float>*>;
template class BlockingQueue<P2PSync<double>*>;
//add these 2 lines below
template class BlockingQueue<cv::Mat*>;
template class BlockingQueue<std::string*>;

这看起来很有趣。我会试一下并回报结果。只有一个问题,我如何将 cv::Mat 作为 C++ 中 caffe 网络的输入传递?另外,当我调用预训练网络时,是否有像 Python 中那样的 raw_scalechannel_swap 参数?我以前从未使用过 C++ 版本的 caffe。 - user3543300
@user3543300 在 data_transformer.cpp 中,接口 DataTransformer<Dtype>::Transform(const cv::Mat& cv_img, Blob<Dtype>* transformed_blob) 将把 cv::Mat 转换为 caffe::Blob 对象,并通过调用 Net::Forward(const vector<Blob<Dtype>*> & bottom, Dtype* loss) 将其作为输入传递给 caffe 网络。DataTransformer::Transform() 将自动在其中执行 channel_swap 过程,但如果要将图像数据从 [0,255] 规范化到 [0,1],则应在 caffe::DataTransformer 中使用成员函数 set_scale(float value) 显式设置比例。 - Dale
2
我运行了代码,我的fps再次降至约15。不确定发生了什么。我有一块Nvidia GeForce 940MX GPU和Intel® Core™ i7-6500U CPU @ 2.50GHz × 4。 - user3543300
@user3543300 是显卡内存带宽最重要的吗? - Dale
不确定。我的GPU内存有2 GB。你能在主线程上达到30 fps吗? - user3543300
显示剩余2条评论

2
似乎caffe的Python封装会阻塞全局解释器锁(GIL)。因此,调用任何caffe Python命令都会阻塞所有 Python线程。
一个解决方法(自担风险)是为特定的caffe函数禁用GIL。例如,如果您想要能够在没有锁定的情况下运行forward,可以编辑{{link2:$CAFFE_ROOT/python/caffe/_caffe.cpp}}。添加此函数:
void Net_Forward(Net<Dtype>& net, int start, int end) {
  Py_BEGIN_ALLOW_THREADS;   // <-- disable GIL
  net.ForwardFromTo(start, end);
  Py_END_ALLOW_THREADS;     // <-- restore GIL
}

.def("_forward", &Net<Dtype>::ForwardFromTo) 替换为:
.def("_forward", &Net_Forward)

不要忘记在更改后运行make pycaffe
有关更多详细信息,请参见this

GIL是否适用于多进程?因为我在这个示例程序中使用的是多进程而不是多线程。 - user3543300
@user3543300,老实说,我不太清楚。我主要处理多线程而不是多进程。我观察到在多进程中也有类似的表现,但尚未在多进程条件下验证此解决方案。 - Shai

0
尝试使用多线程方法而不是多进程。生成进程比生成线程慢。一旦它们在运行,差别不是很大。在您的情况下,我认为使用线程方法会更加有益,因为涉及到许多帧数据。

2
Python有一个GIL锁,只能同时运行一个线程,所以我不确定这是否是并行和速度最佳的选择。 - user3543300
http://stackoverflow.com/questions/32899077/is-it-possible-to-read-webcam-frames-in-parallel - MD. Nazmul Kibria
我只会生成每个进程一次,所以我不确定这是否会有所不同。我之前尝试过多线程方法,但实际上它使一切变慢了。这里有一个很好的解释:https://wiki.python.org/moin/GlobalInterpreterLock。 - user3543300

0
你的代码可能会出现这样一种情况,即在第一次调用时以gpu模式工作,而在后续调用中则以cpu模式计算分类,因为它是默认模式。在旧版本的caffe中,设置一次gpu模式就足够了,但在新版本中,每次都需要设置模式。你可以尝试以下更改:
def run(self):

        #Load caffe net -- code omitted 
        while True:
            caffe.set_mode_gpu()
            caffe.set_device(0)
            image = self.task_queue.get()
            #crop image -- code omitted
            text = net.predict(image)
            self.result_queue.put(text)

        return

同时,请在消费者线程运行时查看GPU计时。您可以使用以下命令来进行Nvidia:

nvidia-smi

以上命令将在运行时显示您的GPU利用率。

如果它无法解决问题,另一个解决方案是将OpenCV帧提取代码放在一个线程下。由于它涉及I/O和设备访问,您可能会从GUI线程/主线程中分离出来运行并获得好处。该线程将把帧推入队列,当前的消费者线程将进行预测。在这种情况下,请小心处理带有关键块的队列。


我尝试了你提出的两个建议,但没有看到任何改进。我使用nvidia x服务器设置(在ubuntu上),在每次调用set_mode_gpu后显式地查看gpu利用率,并看到gpu利用率跳至99%。然而,正如你建议的那样,我将我的帧提取过程和GUI显示过程分别作为两个进程(都不是主程序),但没有看到任何性能提升。事实上,我认为我的CPU使用率可能略有增加。 - user3543300
一个单独帧在GPU上分类需要多少时间? - MD. Nazmul Kibria
大约0.15秒。 - user3543300
每个预测需要0.15秒,因此您无法每秒处理超过6帧。即使使用线程进行预测,如果您试图处理30帧每秒,仍会有持续的延迟。我不确定您是否正在使用cudnn。如果没有,您可以使用它。它比仅GPU模式加速速度。 - MD. Nazmul Kibria
让我们在聊天中继续这个讨论 - MD. Nazmul Kibria
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接