在Python中检测和记录音频

111

我需要以WAV文件的形式捕获音频片段,然后将其传递给另一个Python处理模块。问题是需要确定何时存在音频并记录它,在静默时停止录制,然后将该文件传递给处理模块。

我认为可以使用wave模块检测纯静音,并将其丢弃,一旦检测到除静音外的声音,立即开始录制,然后当线路再次变得安静时停止录制。

只是还不能完全理解,有人能给我一个基本示例吗?

6个回答

119

作为Nick Fortescue答案的跟进,这是一个更完整的示例,演示如何从麦克风录制并处理所得到的数据:

from sys import byteorder
from array import array
from struct import pack

import pyaudio
import wave

THRESHOLD = 500
CHUNK_SIZE = 1024
FORMAT = pyaudio.paInt16
RATE = 44100

def is_silent(snd_data):
    "Returns 'True' if below the 'silent' threshold"
    return max(snd_data) < THRESHOLD

def normalize(snd_data):
    "Average the volume out"
    MAXIMUM = 16384
    times = float(MAXIMUM)/max(abs(i) for i in snd_data)

    r = array('h')
    for i in snd_data:
        r.append(int(i*times))
    return r

def trim(snd_data):
    "Trim the blank spots at the start and end"
    def _trim(snd_data):
        snd_started = False
        r = array('h')

        for i in snd_data:
            if not snd_started and abs(i)>THRESHOLD:
                snd_started = True
                r.append(i)

            elif snd_started:
                r.append(i)
        return r

    # Trim to the left
    snd_data = _trim(snd_data)

    # Trim to the right
    snd_data.reverse()
    snd_data = _trim(snd_data)
    snd_data.reverse()
    return snd_data

def add_silence(snd_data, seconds):
    "Add silence to the start and end of 'snd_data' of length 'seconds' (float)"
    silence = [0] * int(seconds * RATE)
    r = array('h', silence)
    r.extend(snd_data)
    r.extend(silence)
    return r

def record():
    """
    Record a word or words from the microphone and 
    return the data as an array of signed shorts.

    Normalizes the audio, trims silence from the 
    start and end, and pads with 0.5 seconds of 
    blank sound to make sure VLC et al can play 
    it without getting chopped off.
    """
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=1, rate=RATE,
        input=True, output=True,
        frames_per_buffer=CHUNK_SIZE)

    num_silent = 0
    snd_started = False

    r = array('h')

    while 1:
        # little endian, signed short
        snd_data = array('h', stream.read(CHUNK_SIZE))
        if byteorder == 'big':
            snd_data.byteswap()
        r.extend(snd_data)

        silent = is_silent(snd_data)

        if silent and snd_started:
            num_silent += 1
        elif not silent and not snd_started:
            snd_started = True

        if snd_started and num_silent > 30:
            break

    sample_width = p.get_sample_size(FORMAT)
    stream.stop_stream()
    stream.close()
    p.terminate()

    r = normalize(r)
    r = trim(r)
    r = add_silence(r, 0.5)
    return sample_width, r

def record_to_file(path):
    "Records from the microphone and outputs the resulting data to 'path'"
    sample_width, data = record()
    data = pack('<' + ('h'*len(data)), *data)

    wf = wave.open(path, 'wb')
    wf.setnchannels(1)
    wf.setsampwidth(sample_width)
    wf.setframerate(RATE)
    wf.writeframes(data)
    wf.close()

if __name__ == '__main__':
    print("please speak a word into the microphone")
    record_to_file('demo.wav')
    print("done - result written to demo.wav")

1
很棒的例子!在我尝试使用Python录制声音时非常有用。我有一个快速的问题,是否有一种方法来定义录制的时间段。现在它只能录制一个单词吗?我能否调整它并记录例如10秒钟的时间?谢谢! - Swan87
检测和规范化不正确,因为它们计算的是字节而不是短整型。在处理之前,该缓冲区必须转换为numpy数组。 - ArekBulski
这个极其冗长的答案的普及程度表明Python的初学者友好型音频库可能是有帮助的。 - Josiah Yoder
我发现在def record()的循环周围放置一个try: ... except: print('Breaking...')块非常有帮助。然后按Ctrl-C可以更早地中断播放。尽管这个问题完美地回答了OP,但对于较长时间的录音,静音并不是标记录音结束的方便方式。 - Josiah Yoder
我不得不将 data = pack('<' + ('h'*len(data)), *data) 替换为 data = data.tobytes(),以便在半小时的录音中不会耗尽内存。 - Josiah Yoder
显示剩余3条评论

49

我认为WAVE模块不支持录音,只能处理现有文件。如果您想要实际录制声音,可以考虑使用PyAudio

WAV是世界上最简单的文件格式之一。在paInt16中,您只需要获取表示音量级别的带符号整数,越靠近0则越安静。我记不清WAV文件是高字节优先还是低字节优先了,但是类似下面的代码应该可以工作(抱歉,我不是真正的Python程序员):

from array import array

# you'll probably want to experiment on threshold
# depends how noisy the signal
threshold = 10 
max_value = 0

as_ints = array('h', data)
max_value = max(as_ints)
if max_value > threshold:
    # not silence

以下是保留作参考的录音PyAudio代码:

import pyaudio
import sys

chunk = 1024
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
RECORD_SECONDS = 5

p = pyaudio.PyAudio()

stream = p.open(format=FORMAT,
                channels=CHANNELS, 
                rate=RATE, 
                input=True,
                output=True,
                frames_per_buffer=chunk)

print "* recording"
for i in range(0, 44100 / chunk * RECORD_SECONDS):
    data = stream.read(chunk)
    # check for silence here by comparing the level with 0 (or some threshold) for 
    # the contents of data.
    # then write data or not to a file

print "* done"

stream.stop_stream()
stream.close()
p.terminate()

谢谢Nick,是的,我也在使用portaudio进行捕获,我卡在了检测静音上,如何获取数据块中的级别? - Sam Machin
我在上面添加了一些非常简单未经测试的代码,但它应该能够完成您想要的工作。 - Nick Fortescue
我的之前版本有一个错误,没有正确处理符号。我现在使用了库函数array()来正确解析。 - Nick Fortescue
WAV文件格式是一种容器,它可以包含通过各种编解码器(如GSM或MP3)编码的音频,有些远非“世界上最简单”的。 - Jacek Konieczny
2
我认为在打开流时使用选项“output=True”对于录制来说并不必要,而且它似乎会在我的设备上引起“IOError: [Errno Input overflowed] -9981”的错误。否则,感谢您提供的代码示例,它非常有帮助。 - Binus

21

感谢 cryo 提供了改进版,我基于下面的测试代码进行了测试:

#Instead of adding silence at start and end of recording (values=0) I add the original audio . This makes audio sound more natural as volume is >0. See trim()
#I also fixed issue with the previous code - accumulated silence counter needs to be cleared once recording is resumed.

from array import array
from struct import pack
from sys import byteorder
import copy
import pyaudio
import wave

THRESHOLD = 500  # audio levels not normalised.
CHUNK_SIZE = 1024
SILENT_CHUNKS = 3 * 44100 / 1024  # about 3sec
FORMAT = pyaudio.paInt16
FRAME_MAX_VALUE = 2 ** 15 - 1
NORMALIZE_MINUS_ONE_dB = 10 ** (-1.0 / 20)
RATE = 44100
CHANNELS = 1
TRIM_APPEND = RATE / 4

def is_silent(data_chunk):
    """Returns 'True' if below the 'silent' threshold"""
    return max(data_chunk) < THRESHOLD

def normalize(data_all):
    """Amplify the volume out to max -1dB"""
    # MAXIMUM = 16384
    normalize_factor = (float(NORMALIZE_MINUS_ONE_dB * FRAME_MAX_VALUE)
                        / max(abs(i) for i in data_all))

    r = array('h')
    for i in data_all:
        r.append(int(i * normalize_factor))
    return r

def trim(data_all):
    _from = 0
    _to = len(data_all) - 1
    for i, b in enumerate(data_all):
        if abs(b) > THRESHOLD:
            _from = max(0, i - TRIM_APPEND)
            break

    for i, b in enumerate(reversed(data_all)):
        if abs(b) > THRESHOLD:
            _to = min(len(data_all) - 1, len(data_all) - 1 - i + TRIM_APPEND)
            break

    return copy.deepcopy(data_all[_from:(_to + 1)])

def record():
    """Record a word or words from the microphone and 
    return the data as an array of signed shorts."""

    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, output=True, frames_per_buffer=CHUNK_SIZE)

    silent_chunks = 0
    audio_started = False
    data_all = array('h')

    while True:
        # little endian, signed short
        data_chunk = array('h', stream.read(CHUNK_SIZE))
        if byteorder == 'big':
            data_chunk.byteswap()
        data_all.extend(data_chunk)

        silent = is_silent(data_chunk)

        if audio_started:
            if silent:
                silent_chunks += 1
                if silent_chunks > SILENT_CHUNKS:
                    break
            else: 
                silent_chunks = 0
        elif not silent:
            audio_started = True              

    sample_width = p.get_sample_size(FORMAT)
    stream.stop_stream()
    stream.close()
    p.terminate()

    data_all = trim(data_all)  # we trim before normalize as threshhold applies to un-normalized wave (as well as is_silent() function)
    data_all = normalize(data_all)
    return sample_width, data_all

def record_to_file(path):
    "Records from the microphone and outputs the resulting data to 'path'"
    sample_width, data = record()
    data = pack('<' + ('h' * len(data)), *data)

    wave_file = wave.open(path, 'wb')
    wave_file.setnchannels(CHANNELS)
    wave_file.setsampwidth(sample_width)
    wave_file.setframerate(RATE)
    wave_file.writeframes(data)
    wave_file.close()

if __name__ == '__main__':
    print("Wait in silence to begin recording; wait in silence to terminate")
    record_to_file('demo.wav')
    print("done - result written to demo.wav")

2
谢谢,非常好用。在我的情况下,我不得不编辑return copy.deepcopy(data_all[_from:(_to + 1)])copy.deepcopy(data_all[int(_from):(int(_to) + 1)]) - lukassliacky
1
lukassliacky提出的修复建议是让这个非常好的解决方案工作所必需的,有人应该接受这个编辑。 - Lorenzo Sciuto
这个答案对Cryo的工作有什么补充? - Josiah Yoder

8
import pyaudio
import wave
from array import array

FORMAT=pyaudio.paInt16
CHANNELS=2
RATE=44100
CHUNK=1024
RECORD_SECONDS=15
FILE_NAME="RECORDING.wav"

audio=pyaudio.PyAudio() #instantiate the pyaudio

#recording prerequisites
stream=audio.open(format=FORMAT,channels=CHANNELS, 
                  rate=RATE,
                  input=True,
                  frames_per_buffer=CHUNK)

#starting recording
frames=[]

for i in range(0,int(RATE/CHUNK*RECORD_SECONDS)):
    data=stream.read(CHUNK)
    data_chunk=array('h',data)
    vol=max(data_chunk)
    if(vol>=500):
        print("something said")
        frames.append(data)
    else:
        print("nothing")
    print("\n")


#end of recording
stream.stop_stream()
stream.close()
audio.terminate()
#writing to file
wavfile=wave.open(FILE_NAME,'wb')
wavfile.setnchannels(CHANNELS)
wavfile.setsampwidth(audio.get_sample_size(FORMAT))
wavfile.setframerate(RATE)
wavfile.writeframes(b''.join(frames))#append frames recorded to file
wavfile.close()

我认为这会有所帮助。这是一个简单的脚本,可以检查是否有静音。如果检测到静音,则不会记录,否则会记录下来。


2

pyaudio网站有许多例子非常简短明了:http://people.csail.mit.edu/hubert/pyaudio/

2019年12月14日更新 - 上述链接网站的主要示例来自2017年:


"""PyAudio Example: Play a WAVE file."""

import pyaudio
import wave
import sys

CHUNK = 1024

if len(sys.argv) < 2:
    print("Plays a wave file.\n\nUsage: %s filename.wav" % sys.argv[0])
    sys.exit(-1)

wf = wave.open(sys.argv[1], 'rb')

p = pyaudio.PyAudio()

stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                channels=wf.getnchannels(),
                rate=wf.getframerate(),
                output=True)

data = wf.readframes(CHUNK)

while data != '':
    stream.write(data)
    data = wf.readframes(CHUNK)

stream.stop_stream()
stream.close()

p.terminate()

问题不在于播放音频,而是记录+检测和消除静音。 - Marki555

0

你可能也想看看csounds。它有几个API,包括Python。它或许能够与A-D接口交互并收集声音样本。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接