树莓派上的Python延迟

4

我正在尝试模拟复合动作电位,以校准研究仪器。目标是在250 Hz时输出一定的10 µV信号。低电压将在后面处理,对我来说主要问题是频率。下图显示了我正在尝试制作的系统概述。

enter image description here

通过从活体动物中获取数据,并在MATLAB中处理数据,我制作了一个低噪声信号,采用12位格式,共有789个值。然后我使用Git将存储在csv格式的仓库克隆到树莓派上。下面是我在RPi上编写的Python脚本。您可以跳转到脚本中的def main以查看功能。
#!/usr/bin/python

import spidev
from time import sleep
import RPi.GPIO as GPIO
import csv
import sys
import math

DEBUG = False
spi_max_speed = 20 * 1000000
V_Ref = 5000
Resolution = 2**12
CE = 0

spi = spidev.SpiDev()
spi.open(0,CE)
spi.max_speed_hz = spi_max_speed

LDAQ = 22
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LDAQ, GPIO.OUT)
GPIO.output(LDAQ,GPIO.LOW)

def setOutput(val):
    lowByte = val & 0b11111111 #Make bytes using MCP4921 data sheet info
    highByte = ((val >> 8) & 0xff) | 0b0 << 7 | 0b0 << 6 | 0b1 << 5 | 0b1 << 4
    if DEBUG :
        print("Highbyte = {0:8b}".format(highByte))
        print("Lowbyte =  {0:8b}".format(lowByte))
    spi.xfer2([highByte, lowByte])

def main():
    with open('signal12bit.csv') as signal:
        signal_length = float(raw_input("Please input signal length in ms: "))
        delay = float(raw_input("Please input delay after signal in ms: "))
        amplitude = float(raw_input("Please input signal amplitude in mV: "))
        print "Starting Simulant with signal length %.1f ms, delay %.1f ms and amplitude %.1f mV." % (signal_length, delay, amplitude)
        if not DEBUG : print "Press ctrl+c to close."
        sleep (1) #Wait a sec before starting
        read = csv.reader(signal, delimiter=' ', quotechar='|')
        try:
            while(True):
                signal.seek(0)
                for row in read: #Loop csv file rows
                    if DEBUG : print ', '.join(row)
                    setOutput(int(row)/int((V_Ref/amplitude))) #Adjust amplitude, not super necessary to do in software
                    sleep (signal_length/(data_points*1000) #Divide by 1000 to make into ms, divide by length of data
                sleep (delay/1000)
        except (KeyboardInterrupt, Exception) as e:
            print(e)
            print "Closing SPI channel"
            setOutput(0)
            GPIO.cleanup()
            spi.close()

if __name__ == '__main__':
    main()

这个脚本基本符合预期。将MCP4921 DAC的输出引脚连接到示波器上,可以看到它非常好地重现了信号,并且正确输出了随后的延迟。

不幸的是,数据点之间的间隔比我需要的要大得多。我可以将信号压缩到的最短时间是约79毫秒。这是由于在睡眠函数中除以789000,我知道这对于Python和Pi来说都是太多的,因为读取csv文件需要时间。但是,如果我尝试手动制作一个数组,并将这些值输出而不是读取csv文件,我可以在不损失任何东西的情况下实现超过6 kHz的频率。

我的问题是

如何使该信号以250 Hz的频率出现,并可靠地从用户输入减少?我考虑过在脚本中手动将789个值写入数组中,然后将SPI速度更改为适合250 Hz的任何值。这将消除缓慢的csv读取器功能,但是无法从用户输入中降低频率。无论如何,消除csv.read的需求都会有很大帮助。谢谢!


你可以将csv读取器中的数据做成一个列表,这样它就可以被缓存到内存中了。 - Jean-François Fabre
我尝试过这样做,但似乎没有任何区别。在这个脚本之前,我从未使用过Python编程语言,请问您能具体解释一下如何只读取CSV一次,然后循环缓存列表吗? - Tachyon
"250Hz" = 你的意思是每789个样本之间有4毫秒的间隔吗? - DisappointedByUnaccountableMod
将当前循环更改为仅执行一次,而不是调用SetOutput(),将该值附加到列表中:outputlist.append(),不要在该循环中使用sleep。然后添加另一个循环以输出outputlistvalue中的值:SetOutput(outputlistvalue) sleep() - DisappointedByUnaccountableMod
整个信号的频率为250赫兹。因此,在4毫秒内有789个值,每个值之间间隔44微秒。 - Tachyon
我已经添加了一个答案,告诉你如何编写将值存储在列表中并从该列表生成输出的代码 - 但是请仔细阅读注释,因为在Pi上运行Linux和Python时编码并不是真正的挑战。 Linux不是实时操作系统,您的Python代码将被计时器、I/O中断和进程切换中断。对于人类与Pi的交互,这些延迟并不重要,但是当您尝试获取微秒定时时,这些中断的长度就像一根线的长度一样可预测。 - DisappointedByUnaccountableMod
1个回答

1

今天早些时候我想到了解决方法,所以我想在这里发布一个答案,以防将来有人遇到类似的问题。

使用sleep()无法解决数据点之间的内部延迟问题,原因有几个。我最终做的是以下内容:

  • 所有数学和函数调用移出关键循环
  • 对没有延迟的传输值所需时间进行线性回归分析
  • 在MATLAB中将CSV文件中的数据点数量增加到“足够多”(9600)
  • 计算所需点数以满足用户所需的信号长度
  • 从现在更大的CSV文件中取出均匀分布的条目,以尽可能接近该点数。
  • 计算这些值,然后明确计算SPI字节
  • 保存这两个字节列表,并在关键循环中直接输出它们

带有一些输入检查的新代码如下:

#!/usr/bin/python

import spidev
from time import sleep
import RPi.GPIO as GPIO
import sys
import csv
import ast

spi_max_speed = 16 * 1000000 # 16 MHz
V_Ref = 5000 # 5V in mV
Resolution = 2**12 # 12 bits for the MCP 4921
CE = 0 # CE0 or CE1, select SPI device on bus
total_data_points = 9600 #CSV file length

spi = spidev.SpiDev()
spi.open(0,CE)
spi.max_speed_hz = spi_max_speed

LDAQ=22
GPIO.setmode(GPIO.BOARD)
GPIO.setup(LDAQ, GPIO.OUT)
GPIO.output(LDAQ,GPIO.LOW)

def main():

    #User inputs and checking for digits
    signalLengthU = raw_input("Input signal length in ms, minimum 4: ")
    if signalLengthU.isdigit():
        signalLength = signalLengthU
    else:
        signalLength = 4

    delayU = raw_input("Input delay after signal in ms: ")
    if delayU.isdigit():
        delay = delayU
    else:
        delay = 0

    amplitudeU = raw_input("Input signal amplitude in mV, between 1 and 5000: ")
    if amplitudeU.isdigit():
        amplitude = amplitudeU
    else:
        amplitude = 5000

    #Calculate data points, delay, and amplitude
    data_points = int((1000*float(signalLength)-24.6418)/12.3291)
    signalDelay = float(delay)/1000
    setAmplitude = V_Ref/float(amplitude)

    #Load and save CSV file
    datain = open('signal12bit.csv')
    read = csv.reader(datain, delimiter=' ', quotechar='|')
    signal = []
    for row in read:
        signal.append(ast.literal_eval(row[0]))

    #Downsampling to achieve desired signal length
    downsampling = int(round(total_data_points/data_points))
    signalSpeed = signal[0::downsampling]
    listlen = len(signalSpeed)

    #Construction of SPI bytes, to avoid calling functions in critical loop
    lowByte = []
    highByte = []
    for i in signalSpeed:
        lowByte.append(int(i/setAmplitude) & 0b11111111)
        highByte.append(((int(i/setAmplitude) >> 8) & 0xff) | 0b0 << 7 | 0b0 << 6 | 0b1 << 5 | 0b1 << 4)

    print "Starting Simulant with signal length %s ms, delay %s ms and amplitude %s mV." % (signalLength, delay, amplitude)
    print "Press ctrl+c to stop."
    sleep (1)

    try:
        while(True): #Main loop
            for i in range(listlen):
                spi.xfer2([highByte[i],lowByte[i]]) #Critical loop, no delay!
            sleep (signalDelay)
    except (KeyboardInterrupt, Exception) as e:
        print e
        print "Closing SPI channel"
        lowByte = 0 & 0b11111111
        highByte = ((0 >> 8) & 0xff) | 0b0 << 7 | 0b0 << 6 | 0b1 << 5 | 0b1 << 4
        spi.xfer2([highByte, lowByte])
        GPIO.cleanup()
        spi.close()

if __name__ == '__main__':
    main()

结果正是我想要的。下面是一个示例,来自信号长度为5毫秒、200赫兹的示波器。感谢你们的帮助!

Oscilloscope reading


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接