我修复了由Mike Schultz编写的代码
above。我还试图根据麦克风噪音自动设置rms阈值的值,但是失败得很惨。因此,您必须手动将阈值设置为您的麦克风噪音水平。
import pyaudio
import math
import struct
import wave
import time
import datetime
import os
TRIGGER_RMS = 10
RATE = 16000
TIMEOUT_SECS = 1
FRAME_SECS = 0.25
CUSHION_SECS = 1
SHORT_NORMALIZE = (1.0/32768.0)
FORMAT = pyaudio.paInt16
CHANNELS = 1
SHORT_WIDTH = 2
CHUNK = int(RATE * FRAME_SECS)
CUSHION_FRAMES = int(CUSHION_SECS / FRAME_SECS)
TIMEOUT_FRAMES = int(TIMEOUT_SECS / FRAME_SECS)
f_name_directory = './'
class Recorder:
@staticmethod
def rms(frame):
count = len(frame) / SHORT_WIDTH
format = "%dh" % (count)
shorts = struct.unpack(format, frame)
sum_squares = 0.0
for sample in shorts:
n = sample * SHORT_NORMALIZE
sum_squares += n * n
rms = math.pow(sum_squares / count, 0.5)
return rms * 1000
def __init__(self):
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
output=True,
frames_per_buffer=CHUNK)
self.time = time.time()
self.quiet = []
self.quiet_idx = -1
self.timeout = 0
def record(self):
print('')
sound = []
start = time.time()
begin_time = None
while True:
data = self.stream.read(CHUNK)
rms_val = self.rms(data)
if self.inSound(data):
sound.append(data)
if begin_time == None:
begin_time = datetime.datetime.now()
else:
if len(sound) > 0:
self.write(sound, begin_time)
sound.clear()
begin_time = None
else:
self.queueQuiet(data)
curr = time.time()
secs = int(curr - start)
tout = 0 if self.timeout == 0 else int(self.timeout - curr)
label = 'Listening' if self.timeout == 0 else 'Recording'
print('[+] %s: Level=[%4.2f] Secs=[%d] Timeout=[%d]' % (label, rms_val, secs, tout), end='\r')
def queueQuiet(self, data):
self.quiet_idx += 1
if self.quiet_idx == CUSHION_FRAMES:
self.quiet_idx = 0
if len(self.quiet) < CUSHION_FRAMES:
self.quiet.append(data)
else:
self.quiet[self.quiet_idx] = data
def dequeueQuiet(self, sound):
if len(self.quiet) == 0:
return sound
ret = []
if len(self.quiet) < CUSHION_FRAMES:
ret.append(self.quiet)
ret.extend(sound)
else:
ret.extend(self.quiet[self.quiet_idx + 1:])
ret.extend(self.quiet[:self.quiet_idx + 1])
ret.extend(sound)
return ret
def inSound(self, data):
rms = self.rms(data)
curr = time.time()
if rms > TRIGGER_RMS:
self.timeout = curr + TIMEOUT_SECS
return True
if curr < self.timeout:
return True
self.timeout = 0
return False
def write(self, sound, begin_time):
sound = self.dequeueQuiet(sound)
keep_frames = len(sound) - TIMEOUT_FRAMES + CUSHION_FRAMES
recording = b''.join(sound[0:keep_frames])
filename = begin_time.strftime('%Y-%m-%d_%H.%M.%S')
pathname = os.path.join(f_name_directory, '{}.wav'.format(filename))
wf = wave.open(pathname, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(self.p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(recording)
wf.close()
print('[+] Saved: {}'.format(pathname))
a = Recorder()
a.record()
除此之外,如果有人想要检测人类的语音而不是一般的声音,你可以查找一些叫做声活动检测器(VAD)的东西,比如this,它们提供适用于多个平台的SDK,非常适合应用开发。还有一种叫做webrtc的东西,但它相对较慢且准确性较低。
最后,你可以训练自己的神经网络模型来检测语音、噪音、确切的词语或者任何你想要的内容,尽管这将需要更多的时间和努力。