使用Python的watchdog结合多进程或多线程技术

10
我正在使用Python的Watchdog监控某个目录以检测新文件的创建。当创建一个文件时,会运行一些代码,生成一个子进程shell命令来运行不同的代码以处理此文件。这应该运行每个新创建的文件。我已经测试过了,当一个文件被创建时,一切正常,但是在多个文件同时或接连被创建时,我遇到了麻烦。
我的当前问题是... shell中运行的处理代码需要一段时间才能运行,而且在目录中创建新文件之前不会完成。我无能为力解决它。当这段代码正在运行时,Watchdog将无法识别是否已创建新文件,并且将无法继续执行代码。所以我认为我需要为每个新文件生成一个新进程,或者做一些并行处理的事情,而不是等待一个文件完成后再处理下一个文件。
因此我的问题是:

1.) 实际上,我会有4个文件,分别在同一个目录下的不同系列中同时创建。最好的方法是如何让watchdog一次性对所有4个文件的创建运行代码?

2.) 当代码正在处理一个文件时,如何让watchdog开始处理同一系列中的下一个文件,而不必等待前一个文件的处理完成。这是必要的,因为文件很特殊,我需要暂停一个文件的处理,直到另一个文件完成,但它们被创建的顺序可能会有所不同。

我需要将我的watchdog与多进程或线程结合起来吗?还是我需要实现多个观察者?我有点茫然。谢谢任何帮助。

class MonitorFiles(FileSystemEventHandler):
    '''Sub-class of watchdog event handler'''

    def __init__(self, config=None, log=None):
        self.log = log
        self.config = config

    def on_created(self, event):
        file = os.path.basename(event.src_path)
        self.log.info('Created file {0}'.format(event.src_path))
        dosWatch.go(event.src_path, self.config, self.log)

    def on_modified(self, event):
        file = os.path.basename(event.src_path)
        ext = os.path.splitext(file)[1]
        if ext == '.fits':
            self.log.warning('Modifying a FITS file is not allowed')
            return

    def on_deleted(self, event):
        self.log.critical('Nothing should ever be deleted from here!')
        return      

主要监控

def monitor(config, log):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code'''

    event_handler = dosclass.MonitorFiles(config, log)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.start()
    log.info('Begin MaNGA DOS!')
    log.info('Start watching directory {0} for new files ...'.format(config.fitsDir))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        log.info('Stop watching directory ...')
        log.info('End MaNGA DOS!')
        log.info('--------------------------')
        log.info('')
    observer.join() 

在上述内容中,我的监视方法设置了看门狗来监视主目录。MonitorFiles类定义了文件创建时发生的操作。它基本上调用了dosWatch.go方法,最终调用了subprocess.Popen来运行一个shell命令。

你能不能只生成子进程而不等待它们完成呢? - Janne Karila
我以为我正在做的就是这样。dosWatch.go调用了一些东西,但最终运行了p=subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)来运行一些其他代码。我不想等到它完成,但出于某种原因它正在这样做。我在某个地方读到,看门狗基本上是线程的子类,而observer.join()是使进程等待完成的罪魁祸首。但我对线程包知之甚少。 - havok2063
watchdog内部,有一个线程调用您的FileSystemEventHandler对象。所以,是的,您是正确的,在这个线程中按顺序调用on_*方法。如果您想在完成工作之前接收下一个事件,则需要生成一个线程并立即从on_created返回。确保dosWatch.go真的没有阻塞on_created - gwohpq9
实际上,情况并不像我想象的那么糟糕。看门狗队列事件,结果对我来说完全没问题。我只需要添加一些代码来处理一些文件排序问题。然后,我将整个过程包装在一个多进程命令中,以生成一个看门狗进程来监视4种类型的文件。目前看起来运行良好。 - havok2063
2个回答

2
以下是我最终采取的解决方法,成功解决了我的问题。我使用多进程启动一个独立的看门狗监控进程,以单独监视每个文件。Watchdog已经为我排队了新文件,这对我来说很好。
至于上面提到的第2点,我需要在处理file1之前处理file2,即使file1先被创建。因此,在处理file1时,我会检查file2处理的输出情况。如果找到了,就会继续处理file1。如果没有找到,则退出。在处理file2时,我会检查是否已经创建了file1,如果是,则处理file1。(未显示此代码)
主要摄像头监控
def monitorCam(camera, config, mainlog):
    '''Uses the Watchdog package to monitor the data directory for new files.
    See the MonitorFiles class in dosClasses for actual monitoring code.  Monitors each camera.'''

    mainlog.info('Process Name, PID: {0},{1}'.format(mp.current_process().name,mp.current_process().pid))

    #init cam log
    camlog = initLogger(config, filename='manga_dos_{0}'.format(camera))
    camlog.info('Camera {0}, PID {1} '.format(camera,mp.current_process().pid))
    config.camera=camera

    event_handler = dosclass.MonitorFiles(config, camlog, mainlog)

    # add logging the the event handler
    log_handler = LoggingEventHandler()

    # set up observer
    observer = Observer()
    observer.schedule(event_handler, path=config.fitsDir, recursive=False)
    observer.schedule(log_handler, config.fitsDir, recursive=False)
    observer.daemon=True
    observer.start()
    camlog.info('Begin MaNGA DOS!')
    camlog.info('Start watching directory {0} for new files ...'.format(config.fitsDir))
    camlog.info('Watching directory {0} for new files from camera {1}'.format(config.fitsDir,camera))

    # monitor
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.unschedule_all()
        observer.stop()
        camlog.info('Stop watching directory ...')
        camlog.info('End MaNGA DOS!')
        camlog.info('--------------------------')
        camlog.info('')
    #observer.join()

    if observer.is_alive():
        camlog.info('still alive')
    else:
        camlog.info('thread ending')    

多相机进程的启动

def startProcess(camera,config,log):
    ''' Uses multiprocessing module to start 4 different camera monitoring processes'''

    jobs=[]

    #pdb.set_trace()

    #log.info(mp.log_to_stderr(logging.DEBUG))
    for i in range(len(camera)):
        log.info('Starting to monitor camera {0}'.format(camera[i]))
        print 'Starting to monitor camera {0}'.format(camera[i])
        try:
            p = mp.Process(target=monitorCam, args=(camera[i],config, log), name=camera[i])
            p.daemon=True
            jobs.append(p)
            p.start()
        except KeyboardInterrupt:
            log.info('Ending process: {0} for camera {1}'.format(mp.current_process().pid, camera[i]))
            p.terminate()
            log.info('Terminated: {0}, {1}'.format(p,p.is_alive()))

    for i in range(len(jobs)):
        jobs[i].join()  

    return      

我有一个类似的问题,你能帮忙吗? - IamKarim1992
https://dev59.com/wbfna4cB1Zd3GeqPlAaz - IamKarim1992

1

我不确定每个文件都开一个线程是否有意义。 GIL 可能会消除你从中获得的任何优势,甚至可能严重影响性能并导致一些意外的行为。 我个人没有发现 watchdog 特别可靠。 你可以考虑实现自己的文件监视器,这在 django 框架中可以很容易地完成(请参见 此处),创建一个字典,其中包含每个文件的修改时间戳。


是的,我可能不需要为每个单独的文件开一个线程。但是我确实需要为我的4个单独文件中的每一个进程。这些需要同时处理,彼此独立。每组中的后续文件可以排队在同一线程上。我考虑过实现自己的文件监视器,但这似乎是有人已经比我做得更好的事情。不过我会浏览一下这个Django示例。 - havok2063

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接