Python线程类用于GPIO LED闪烁

3

很遗憾,在尝试了各种方法都没有解决这个问题之后,我来到这里。

我的想法是:

我需要一个名为Led的类,该类在构造函数中只需接受GPIO引脚,并提供以下方法:

  • Light On(点亮)
  • Light Off(熄灭)
  • Blinking(闪烁)

我的做法是:

我已经按照以下方式构建了这个类:

import RPi.GPIO as GPIO
import time
import threading
from threading import Thread


class Led(Thread):

    def __init__(self, led_pin):
        Thread.__init__(self)
        self.pin_stop = threading.Event()
        self.__led_pin = led_pin
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.__led_pin, GPIO.OUT)

    def low(self, pin):
        GPIO.setup(pin, GPIO.OUT)
        GPIO.output(pin, GPIO.LOW)

    def blink(self, time_on=0.050, time_off=1):
        pin = threading.Thread(name='ledblink',target=self.__blink_pin, args=(time_on, time_off, self.pin_stop))
        pin.start()

    def __blink_pin(self, time_on, time_off, pin_stop):
        while not pin_stop.is_set():
            GPIO.output(self.__led_pin, GPIO.HIGH)
            time.sleep(time_on)
            GPIO.output(self.__led_pin, GPIO.LOW)
            time.sleep(time_off)

    def __stop(self):
        self.pin_stop.set()


    def reset(self):
        GPIO.cleanup()

    def off(self):
        self.__stop()

    def on(self):
        self.__stop()
        GPIO.output(self.__led_pin, GPIO.LOW)
        GPIO.output(self.__led_pin, GPIO.HIGH)

blink 方法负责无限制地闪烁 LED,直到出现 Off 或 On 方法调用。

运行以下简单代码:

from classes.leds import Led
import time
from random import randint

Led16 = Led(16)

def main():
    while True:
        if (randint(0, 1) == 1):
            Led16.blink()
        else:
            Led16.off()
        time.sleep(2)


if __name__ == "__main__":
    main()

发生了什么:

Led对象似乎每次调用方法时都会生成一个新的线程,导致GPIO线路在多个线程之间共享。

我的愿望:

我希望保持LED异步闪烁(显然),并且能够控制Led16()对象的状态,也许不需要每次调用其方法时创建新的线程,但是到达这一点时,我有点困惑。

感谢帮助我理解如何实现这个目标。

4个回答

2
你正在创建大量的线程,因为每次调用blink()时都会创建一个新的线程,并且旧线程没有停止。
我认为有几个选项可供选择:
1. 只创建一次线程 - 例如在__init__()中 - 并在闪烁时间间隔上持续运行(即大部分时间处于睡眠状态),读取实例变量并相应地设置LED。要更改LED状态,blink()on()off()通过将此实例变量设置为on/off/blink来控制LED。
2. 在闪烁程序中,如果线程已经在运行,则不要创建新线程,或者停止旧线程(并等待其完成),然后启动新线程。
您需要处理的问题是希望行为是:
- 如果LED关闭,则在调用on()blink()时立即打开LED。 - 如果LED在闪烁,再次调用blink()则不会干扰闪烁的开/关序列。 - 如果在闪烁时调用off(),如果已经开始运行,则希望on周期运行到完成,即不应立即关闭LED,因为那可能是一个非常短暂的闪光,看起来很奇怪。
创建新线程的难点在于等待旧线程完成,而在__init__()中只创建一次线程并持续运行似乎最简单。当LED处于打开或关闭状态时,时间周期缩短(到值FAST_CYCLE),以便在关闭或打开LED时快速响应,因为sleep()的时间很短。
关于您的代码的一些其他要点:
- 我认为你不需要使你的类继承自Thread - 你在pin=...行中创建了一个新的线程。 - 如果在编写代码时添加注释,通常阅读代码时更容易理解正在发生的事情。 - 如果保留对线程的引用(即self.pin = threading.Thread而不是pin = threading.Thread),则可以在reset()中使用join()确保它在继续之前已退出。 - 由于闪烁时间间隔可能会更改,并且线程必须使用最新的值,因此每次读取它们时都使用self而不是将它们作为参数传递给__blink_pin()例程,如果这样做,您也可以使用self获取pin_stop信号量。
import RPi.GPIO as GPIO
import time
import threading
from threading import Thread

class Led(object):

    LED_OFF = 0
    LED_ON = 1
    LED_FLASHING = 2

    # the short time sleep to use when the led is on or off to ensure the led responds quickly to changes to blinking
    FAST_CYCLE = 0.05

    def __init__(self, led_pin):
        # create the semaphore used to make thread exit
        self.pin_stop = threading.Event()
        # the pin for the LED
        self.__led_pin = led_pin
        # initialise the pin and turn the led off
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.__led_pin, GPIO.OUT)
        # the mode for the led - off/on/flashing
        self.__ledmode = Led.LED_OFF
        # make sure the LED is off (this also initialises the times for the thread)
        self.off()
        # create the thread, keep a reference to it for when we need to exit
        self.__thread = threading.Thread(name='ledblink',target=self.__blink_pin)
        # start the thread
        self.__thread.start()

    def blink(self, time_on=0.050, time_off=1):
        # blinking will start at the next first period
        # because turning the led on now might look funny because we don't know
        # when the next first period will start - the blink routine does all the
        # timing so that will 'just work'
        self.__ledmode = Led.LED_FLASHING
        self.__time_on = time_on
        self.__time_off = time_off

    def off(self):
        self.__ledmode = LED_OFF
        # set the cycle times short so changes to ledmode are picked up quickly
        self.__time_on = Led.FAST_CYCLE
        self.__time_off = Led.FAST_CYCLE
        # could turn the LED off immediately, might make for a short flicker on if was blinking

    def on(self):
        self.__ledmode = LED_ON
        # set the cycle times short so changes to ledmode are picked up quickly
        self.__time_on = Led.FAST_CYCLE
        self.__time_off = Led.FAST_CYCLE
        # could turn the LED on immediately, might make for a short flicker off if was blinking

    def reset(self):
        # set the semaphore so the thread will exit after sleep has completed
        self.pin_stop.set()
        # wait for the thread to exit
        self.__thread.join()
        # now clean up the GPIO
        GPIO.cleanup()

    ############################################################################
    # below here are private methods
    def __turnledon(self, pin):
        GPIO.output(pin, GPIO.LOW)

    def __turnledoff(self, pin):
        GPIO.output(pin, GPIO.HIGH)

    # this does all the work
    # If blinking, there are two sleeps in each loop
    # if on or off, there is only one sleep to ensure quick response to blink()
    def __blink_pin(self):
        while not self.pin_stop.is_set():
            # the first period is when the LED will be on if blinking
            if self.__ledmode == Led.LED_ON or self.__ledmode == Led.LED_FLASHING: 
                self.__turnledon()
            else:
                self.__turnledoff()
            # this is the first sleep - the 'on' time when blinking
            time.sleep(self.__time_on)
            # only if blinking, turn led off and do a second sleep for the off time
            if self.__ledmode == Led.LED_FLASHING:
                self.__turnledoff()
                # do an extra check that the stop semaphore hasn't been set before the off-time sleep
                if not self.pin_stop.is_set():
                    # this is the second sleep - off time when blinking
                    time.sleep(self.__time_off)

感谢 @barny 的时间和非常有理据的回答。 明天早上我想尝试你的建议,为社区提供对某人未来需求的反馈。 - Alessandro Mendolia
如果我回答了你的问题,那么按照惯例,请接受我的答案并点赞。 - DisappointedByUnaccountableMod

1

我不确定这会有帮助,但我想到了这个(使用gpiozero

from gpiozero import LED
import time
import threading

class LEDplus():
    def __init__(self,pinnumber):
        self.led = LED(pinnumber)
        self.__loop = True
        self.__threading = threading.Thread(target=self.__blink)


    def on(self,):
        self.__loop = False
        self.maybejoin()
        self.led.on()

    def off(self, ):
        self.__loop = False
        self.maybejoin()
        self.led.off()

    def maybejoin(self,):
        if self.__threading.isAlive():
            self.__threading.join()

    def blink(self, pitch):
        self.__threading = threading.Thread(target=self.__blink, args=(pitch, ))
        self.__threading.start()

    def __blink(self, pitch=.25):
        self.__loop = True
        while self.__loop:
            self.led.toggle()
            time.sleep(pitch/2)
        self.led.off()

green = LEDplus(18)
green.blink(1)

1

对于未来需要此代码的人,我对建议的代码进行了一些小调整,看起来正常工作。

再次感谢@barny

import RPi.GPIO as GPIO
import time
import threading

class Led(object):

    LED_OFF = 0
    LED_ON = 1
    LED_FLASHING = 2

    # the short time sleep to use when the led is on or off to ensure the led responds quickly to changes to blinking
    FAST_CYCLE = 0.05

    def __init__(self, led_pin):
        # create the semaphore used to make thread exit
        self.pin_stop = threading.Event()
        # the pin for the LED
        self.__led_pin = led_pin
        # initialise the pin and turn the led off
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(self.__led_pin, GPIO.OUT)
        # the mode for the led - off/on/flashing
        self.__ledmode = Led.LED_OFF
        # make sure the LED is off (this also initialises the times for the thread)
        self.off()
        # create the thread, keep a reference to it for when we need to exit
        self.__thread = threading.Thread(name='ledblink',target=self.__blink_pin)
        # start the thread
        self.__thread.start()

    def blink(self, time_on=0.050, time_off=1):
        # blinking will start at the next first period
        # because turning the led on now might look funny because we don't know
        # when the next first period will start - the blink routine does all the
        # timing so that will 'just work'
        self.__ledmode = Led.LED_FLASHING
        self.__time_on = time_on
        self.__time_off = time_off

    def off(self):
        self.__ledmode = self.LED_OFF
        # set the cycle times short so changes to ledmode are picked up quickly
        self.__time_on = Led.FAST_CYCLE
        self.__time_off = Led.FAST_CYCLE
        # could turn the LED off immediately, might make for a short flicker on if was blinking

    def on(self):
        self.__ledmode = self.LED_ON
        # set the cycle times short so changes to ledmode are picked up quickly
        self.__time_on = Led.FAST_CYCLE
        self.__time_off = Led.FAST_CYCLE
        # could turn the LED on immediately, might make for a short flicker off if was blinking

    def reset(self):
        # set the semaphore so the thread will exit after sleep has completed
        self.pin_stop.set()
        # wait for the thread to exit
        self.__thread.join()
        # now clean up the GPIO
        GPIO.cleanup()

我非常希望能够使用这个,因为它正是我所需要的,但是上面的代码给了我错误提示,我不确定是否调用不正确。我采用了Alessandro的代码,并在结尾处添加了led=Led(24)作为我的调用,但我收到一个错误,说'int'对象不可调用,我还在__blink_pin上收到一个错误 - 任何帮助将不胜感激。 - Chris

1
Alessandro Mendolia的答案中,只缺少类的私有方法。下面添加了一些修复。 __turnledon()不需要参数-它可以访问在初始化中已经存储的self.__led_pin
############################################################################
# below here are private methods
def __turnledon(self):
    GPIO.output(self.__led_pin, GPIO.LOW)

def __turnledoff(self):
    GPIO.output(self.__led_pin , GPIO.HIGH)

# this does all the work
# If blinking, there are two sleeps in each loop
# if on or off, there is only one sleep to ensure quick response to blink()
def __blink_pin(self):
    while not self.pin_stop.is_set():
        # the first period is when the LED will be on if blinking
        if self.__ledmode == BlinkerLed.LED_ON or self.__ledmode == BlinkerLed.LED_FLASHING: 
            self.__turnledon()
        else:
            self.__turnledoff()
        # this is the first sleep - the 'on' time when blinking
        time.sleep(self.__time_on)
        # only if blinking, turn led off and do a second sleep for the off time
        if self.__ledmode == BlinkerLed.LED_FLASHING:
            self.__turnledoff()
            # do an extra check that the stop semaphore hasn't been set before the off-time sleep
            if not self.pin_stop.is_set():
                # this is the second sleep - off time when blinking
                time.sleep(self.__time_off)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接