我有一个Flask应用程序,它从相机读取帧并将其流式传输到网站。
Camera.py
from threading import Thread
from copy import deepcopy
import queue
import cv2
class Camera(Thread):
def __init__(self, cam, normalQue, detectedQue):
Thread.__init__(self)
self.__cam = cam
self.__normalQue = normalQue
self.__detectedQue = detectedQue
self.__shouldStop = False
def __del__(self):
self.__cam.release()
print('Camera released')
def run(self):
while True:
rval, frame = self.__cam.read()
if rval:
frame = cv2.resize(frame, None, fx=0.5, fy=0.5, interpolation=cv2.INTER_AREA)
_, jpeg = cv2.imencode('.jpg', frame)
self.__normalQue.put(jpeg.tobytes())
self.__detectedQue.put(deepcopy(jpeg.tobytes()))
if self.__shouldStop:
break
def stopCamera(self):
self.__shouldStop = True
从你所看到的,我只是读取帧,改变大小并将其存储在两个不同的队列中。没有太复杂的东西。我还有两个类负责MJPEG流:
NormalVideoStream.py
from threading import Thread
import traceback
import cv2
class NormalVideoStream(Thread):
def __init__(self, framesQue):
Thread.__init__(self)
self.__frames = framesQue
self.__img = None
def run(self):
while True:
if self.__frames.empty():
continue
self.__img = self.__frames.get()
def gen(self):
while True:
try:
if self.__img is None:
print('Normal stream frame is none')
continue
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + self.__img + b'\r\n')
except:
traceback.print_exc()
print('Normal video stream genenation exception')
and
DetectionVideoStream.py
from threading import Thread
import cv2
import traceback
class DetectionVideoStream(Thread):
def __init__(self, framesQue):
Thread.__init__(self)
self.__frames = framesQue
self.__img = None
self.__faceCascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
def run(self):
while True:
if self.__frames.empty():
continue
self.__img = self.__detectFace()
def gen(self):
while True:
try:
if self.__img is None:
print('Detected stream frame is none')
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + self.__img + b'\r\n')
except:
traceback.print_exc()
print('Detection video stream genenation exception')
def __detectFace(self):
retImg = None
try:
img = self.__frames.get()
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = self.__faceCascade.detectMultiScale(gray, 1.1, 4)
for (x, y, w, h) in faces:
cv2.rectangle(img, (x, y), (x + w, y + h), (255, 0, 0), 2)
(_, encodedImage) = cv2.imencode('.jpg', img)
retImg = encodedImage.tobytes()
except:
traceback.print_exc()
print('Face detection exception')
return retImg
从两个流中可以看出,我正在读取相机帧并在无限循环中将其插入队列。这两个类还有一个gen()方法,用于生成帧并将其发送到自身的网站。唯一的区别是,在检测流中,我还进行了人脸识别。
现在,在我的主文件中:
main.py
from flask import Blueprint, render_template, Response, abort, redirect, url_for
from flask_login import login_required, current_user
from queue import Queue
from . import db
from .Camera import Camera
from .NormalVideoStream import NormalVideoStream
from .DetectionVideoStream import DetectionVideoStream
from .models import User
import cv2
main = Blueprint('main', __name__)
# Queues for both streams
framesNormalQue = Queue(maxsize=0)
framesDetectionQue = Queue(maxsize=0)
print('Queues created')
# RPi camera instance
camera = Camera(cv2.VideoCapture(0), framesNormalQue, framesDetectionQue)
camera.start()
print('Camera thread started')
# Streams
normalStream = NormalVideoStream(framesNormalQue)
detectionStream = DetectionVideoStream(framesDetectionQue)
print('Streams created')
normalStream.start()
print('Normal stream thread started')
detectionStream.start()
print('Detection stream thread started')
@main.route('/')
def index():
return render_template('index.html')
@main.route('/profile', methods=["POST", "GET"])
def profile():
if not current_user.is_authenticated:
abort(403)
return render_template('profile.html', name=current_user.name, id=current_user.id, detectionState=current_user.detectionState)
@main.route('/video_stream/<int:stream_id>')
def video_stream(stream_id):
if not current_user.is_authenticated:
abort(403)
print(f'Current user detection: {current_user.detectionState}')
global detectionStream
global normalStream
stream = None
if current_user.detectionState:
stream = detectionStream
print('Stream set to detection one')
else:
stream = normalStream
print('Stream set to normal one')
return Response(stream.gen(), mimetype='multipart/x-mixed-replace; boundary=frame')
@main.route('/detection')
def detection():
if not current_user.is_authenticated:
abort(403)
if current_user.detectionState:
current_user.detectionState = False
else:
current_user.detectionState = True
user = User.query.filter_by(id=current_user.id)
user.detectionState = current_user.detectionState
db.session.commit()
return redirect(url_for('main.profile', id=current_user.id, user_name=current_user.name))
@main.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@main.errorhandler(403)
def page_forbidden(e):
return render_template('403.html'), 403
我正在全局创建相机(camera)、问题(ques)和流(streams)对象。当用户登录网站时,他将能够看到实时视频流,并有一个按钮可以更改当前呈现的流。
整个项目运行良好,唯一的问题是:当我将流更改为检测流时,会有巨大的延迟(大约10/15秒),这使得整个应用程序无法正常工作。我尝试自己搜索错误/优化,但找不到任何东西。我特意在单独的线程上运行所有东西以减轻应用程序的负担,但似乎这还不够。1-2秒的延迟水平是可以接受的,但不是10+秒。因此,各位,也许您们能看到某些错误?或者知道如何进行优化吗?
此外,需要提及的是,整个应用程序都在RPi 4B 4GB上运行,我在桌面上访问网站。默认服务器已更改为Nginx和Gunicorn。从我所看到的,当应用程序工作时,Pi的CPU使用率达到了100%。在测试默认服务器时,行为相同。猜测1.5 GHz的CPU有足够的功率来使其运行更加流畅。