我正在处理原始的RGB帧,将它们编码成h264格式,然后再将其解码回到原始的RGB帧。
[RGB frame] ------ encoder ------> [h264 stream] ------ decoder ------> [RGB frame]
^ ^ ^ ^
encoder_write encoder_read decoder_write decoder_read
我希望尽快获取解码帧,但无论等待多久,似乎总是有一帧的延迟。1在这个例子中,我每2秒送入一个帧给编码器:
$ python demo.py 2>/dev/null
time=0 frames=1 encoder_write
time=2 frames=2 encoder_write
time=2 frames=1 decoder_read <-- decoded output is delayed by extra frame
time=4 frames=3 encoder_write
time=4 frames=2 decoder_read
time=6 frames=4 encoder_write
time=6 frames=3 decoder_read
...
What I want instead:
$ python demo.py 2>/dev/null
time=0 frames=1 encoder_write
time=0 frames=1 decoder_read <-- decode immediately after encode
time=2 frames=2 encoder_write
time=2 frames=2 decoder_read
time=4 frames=3 encoder_write
time=4 frames=3 decoder_read
time=6 frames=4 encoder_write
time=6 frames=4 decoder_read
...
编码器和解码器ffmpeg进程使用以下参数运行:
encoder: ffmpeg -f rawvideo -pix_fmt rgb24 -s 224x224 -i pipe: \
-f h264 -tune zerolatency pipe:
decoder: ffmpeg -probesize 32 -flags low_delay \
-f h264 -i pipe: \
-f rawvideo -pix_fmt rgb24 -s 224x224 pipe:
以下是完整的可再现示例。不需要外部视频文件。只需复制、粘贴并运行python demo.py 2>/dev/null
!
import subprocess
from queue import Queue
from threading import Thread
from time import sleep, time
import numpy as np
WIDTH = 224
HEIGHT = 224
NUM_FRAMES = 256
def t(epoch=time()):
return int(time() - epoch)
def make_frames(num_frames):
x = np.arange(WIDTH, dtype=np.uint8)
x = np.broadcast_to(x, (num_frames, HEIGHT, WIDTH))
x = x[..., np.newaxis].repeat(3, axis=-1)
x[..., 1] = x[:, :, ::-1, 1]
scale = np.arange(1, len(x) + 1, dtype=np.uint8)
scale = scale[:, np.newaxis, np.newaxis, np.newaxis]
x *= scale
return x
def encoder_write(writer):
"""Feeds encoder frames to encode"""
frames = make_frames(num_frames=NUM_FRAMES)
for i, frame in enumerate(frames):
writer.write(frame.tobytes())
writer.flush()
print(f"time={t()} frames={i + 1:<3} encoder_write")
sleep(2)
writer.close()
def encoder_read(reader, queue):
"""Puts chunks of encoded bytes into queue"""
while chunk := reader.read1():
queue.put(chunk)
# print(f"time={t()} chunk={len(chunk):<4} encoder_read")
queue.put(None)
def decoder_write(writer, queue):
"""Feeds decoder bytes to decode"""
while chunk := queue.get():
writer.write(chunk)
writer.flush()
# print(f"time={t()} chunk={len(chunk):<4} decoder_write")
writer.close()
def decoder_read(reader):
"""Retrieves decoded frames"""
buffer = b""
frame_len = HEIGHT * WIDTH * 3
targets = make_frames(num_frames=NUM_FRAMES)
i = 0
while chunk := reader.read1():
buffer += chunk
while len(buffer) >= frame_len:
frame = np.frombuffer(buffer[:frame_len], dtype=np.uint8)
frame = frame.reshape(HEIGHT, WIDTH, 3)
psnr = 10 * np.log10(255**2 / np.mean((frame - targets[i])**2))
buffer = buffer[frame_len:]
i += 1
print(f"time={t()} frames={i:<3} decoder_read psnr={psnr:.1f}")
cmd = (
"ffmpeg "
"-f rawvideo -pix_fmt rgb24 -s 224x224 "
"-i pipe: "
"-f h264 "
"-tune zerolatency "
"pipe:"
)
encoder_process = subprocess.Popen(
cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
cmd = (
"ffmpeg "
"-probesize 32 "
"-flags low_delay "
"-f h264 "
"-i pipe: "
"-f rawvideo -pix_fmt rgb24 -s 224x224 "
"pipe:"
)
decoder_process = subprocess.Popen(
cmd.split(), stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
queue = Queue()
threads = [
Thread(target=encoder_write, args=(encoder_process.stdin,),),
Thread(target=encoder_read, args=(encoder_process.stdout, queue),),
Thread(target=decoder_write, args=(decoder_process.stdin, queue),),
Thread(target=decoder_read, args=(decoder_process.stdout,),),
]
for thread in threads:
thread.start()
¹ 我进行了一些测试,发现解码器在解码当前帧之前会等待下一帧的NAL头部00 00 00 01 41 88
(十六进制)。人们希望前缀00 00 00 01
就足够了,但它还需要等待接下来的两个字节!
² 问题的先前版本。
h264
编码试图编码帧之间的差异以节省带宽,所以它肯定要等到第二帧才能得到差异吧? - Mark Setchell