检查文件是否未被打开或正在被其他进程使用

77
在我的应用程序中,我有以下请求: 1. 有一个线程将定期记录一些日志到文件中。为了保持日志文件的小型化,日志文件将在一定时间间隔内进行翻转。 2. 还有另一个线程也会定期处理这些日志文件。例如:将日志文件移动到其他位置,解析日志内容以生成一些日志报告。
但是,有一个条件是第二个线程不能处理正在使用来记录日志的日志文件。在代码方面,伪代码类似于下面的样子:
#code in second thread to process the log files
for logFile in os.listdir(logFolder):
     if not file_is_open(logFile) or file_is_use(logFile):
          ProcessLogFile(logFile) # move log file to other place, and generate log report....

那么,如何检查文件是否已经打开或被其他进程使用? 我在互联网上进行了一些研究,并得出了一些结果:

try:
   myfile = open(filename, "r+") # or "a+", whatever you need
except IOError:
    print "Could not open file! Please close Excel!"

我试过这份代码,但无论是使用 "r+" 还是 "a+" 标记都无法正常工作。
try:
   os.remove(filename) # try to remove it directly
except OSError as e:
    if e.errno == errno.ENOENT: # file doesn't exist
        break

这段代码可以运行,但不能满足我的要求,因为我不想删除文件来检查它是否被打开。


你尝试过在最后一个try块中将os.remove更改为ProcessLogFile吗?也许可以调整错误编号:有EBUSY其他可以尝试。 - Lev Levitsky
1
你可能想阅读这个问题的链接:https://dev59.com/q3I-5IYBdhLWcg3wFkKO,尤其是这个答案:https://dev59.com/q3I-5IYBdhLWcg3wFkKO#7142094。 - Nicola Coretti
如何在Windows平台上执行类似的操作以列出打开的文件。 - zengwke
相关:https://dev59.com/ukbRa4cB1Zd3GeqP2rHa - djvg
9个回答

63
尝试查找文件是否被其他进程使用的问题在于可能会出现竞争条件。你可能会检查一个文件,认为它没有被使用,然后就在你打开它之前,另一个进程(或线程)突然抢占了它(甚至删除了它)。
好的,假设你决定接受这种可能性,并希望它不会发生。要检查其他进程正在使用的文件取决于操作系统。
在Linux上,这相当容易,只需迭代/proc中的PID即可。这是一个生成器,用于迭代特定PID使用的文件:
def iterate_fds(pid):
    dir = '/proc/'+str(pid)+'/fd'
    if not os.access(dir,os.R_OK|os.X_OK): return

    for fds in os.listdir(dir):
        for fd in fds:
            full_name = os.path.join(dir, fd)
            try:
                file = os.readlink(full_name)
                if file == '/dev/null' or \
                  re.match(r'pipe:\[\d+\]',file) or \
                  re.match(r'socket:\[\d+\]',file):
                    file = None
            except OSError as err:
                if err.errno == 2:     
                    file = None
                else:
                    raise(err)

            yield (fd,file)

在Windows上,情况就不那么简单了,因为API并没有公开。有一个sysinternals工具(handle.exe)可以使用,但我建议使用PyPi模块psutil,它是可移植的(也就是说,在Linux上运行良好,可能也适用于其他操作系统):

import psutil

for proc in psutil.process_iter():
    try:
        # this returns the list of opened files by the current process
        flist = proc.open_files()
        if flist:
            print(proc.pid,proc.name)
            for nt in flist:
                print("\t",nt.path)

    # This catches a race condition where a process ends
    # before we can examine its files    
    except psutil.NoSuchProcess as err:
        print("****",err) 

感谢您的回答。但是,很抱歉我无法尝试安装psutil包。由于应用程序框架的限制,我无法包含其他第三方软件包。有没有办法可以使用纯python2.4来实现这个功能? - zengwke
不使用标准库。另一个选择是在C中编写它,或者使用ctypes - 这需要大量的工作。 - cdarke
4
非常好,但在你的 Linux 示例中,我建议使用 errno.ENOENT 而不是值 2。 - kmarsh
@zengwke:为什么不在你的账户下安装 psutil,即 ~/ - 0 _
5
这对我有用,但我还必须捕获 psutil.AccessDenied 异常。 - FHTMitchell
显示剩余2条评论

42

我喜欢 Daniel 的回答,但是对于Windows用户,我意识到将文件重命名为它已经拥有的名称更安全更简单。这解决了评论中提出的问题。这是代码:

import os

f = 'C:/test.xlsx'
if os.path.exists(f):
    try:
        os.rename(f, f)
        print 'Access on file "' + f +'" is available!'
    except OSError as e:
        print 'Access-error on file "' + f + '"! \n' + str(e)

9
我很确定这在非Windows操作系统上不起作用(我在Linux系统中轻松地重命名了一个正在其他进程中打开的数据库文件)。 - Big_Al_Tx

37
您可以使用下一个函数来检查文件是否有句柄(请记得将完整路径传递给该文件):

您可以使用下一个函数来检查文件是否有句柄(请记得将完整路径传递给该文件):

import psutil

def has_handle(fpath):
    for proc in psutil.process_iter():
        try:
            for item in proc.open_files():
                if fpath == item.path:
                    return True
        except Exception:
            pass

    return False

1
非常好!谢谢。 - ZHAJOR
2
非常棒的解决方案。这是跨平台的吗?在Linux上对我很有效,Windows呢? - David Parks
1
我在使用vim编辑文件时,但它返回了False。这里有什么问题吗? - DennisLi
2
@DennisLi 我也遇到了同样的问题。Vim似乎使用.swp文件,并将其保存在~/.config中的vim目录中。原始文件并没有被Vim(或者在我这里是Neovim)保持打开状态。 - C14L
1
这对我没用,不知为何它仍然返回 False(Ubuntu 18.04,Python 3.7)。 - YTZ
显示剩余6条评论

9

我知道我来晚了,但我也遇到了这个问题,我使用了lsof命令来解决它(我认为这是新的方法)。通过lsof,我们可以检查正在使用该特定文件的进程。以下是我的做法:

from subprocess import check_output,Popen, PIPE
try:
   lsout=Popen(['lsof',filename],stdout=PIPE, shell=False)
   check_output(["grep",filename], stdin=lsout.stdout, shell=False)
except:
   #check_output will throw an exception here if it won't find any process using that file

只需在 except 部分编写日志处理代码,您就可以开始了。


你能检查一下lsof的链接吗?谢谢。 - Cloud Cho
但是它在Windows上不起作用,因为'lsof'是Linux实用程序。 - Aman

3

这是一个稍微更加精细的版本,基于上面其中一条回答

from pathlib import Path


def is_file_in_use(file_path):
    path = Path(file_path)
    
    if not path.exists():
        raise FileNotFoundError
    
    try:
        path.rename(path)
    except PermissionError:
        return True
    else:
        return False

3
你可以使用 inotify 来监视文件系统中的活动。你可以监听文件关闭事件,这表示发生了滚动。你还应该在文件大小上添加附加条件。确保从第二个线程中过滤掉文件关闭事件。

3

在Windows上,可以使用以下解决方法,而不是使用os.remove():

import os

file = "D:\\temp\\test.pdf"
if os.path.exists(file):
    try:
        os.rename(file,file+"_")
        print "Access on file \"" + str(file) +"\" is available!"
        os.rename(file+"_",file)
    except OSError as e:
        message = "Access-error on file \"" + str(file) + "\"!!! \n" + str(e)
        print message

9
这里存在竞态条件。如果用户在第一次重命名后中断程序(ctrl-c),则文件名将无法恢复,而用户也不会意识到此情况。至少应该将两个重命名操作配对在一起,然后再进行打印。这样可以最大限度地减小危险窗口。os.rename(---); os.rename(---); print "Access ---" 你还应该捕获KeyboardInterrupt和SystemExit异常,以便在应用程序退出之前尝试恢复文件名。 - Noah Spurrier

1

Windows 上,您还可以通过利用 NTDLL/KERNEL32 Windows API 直接检索信息。以下代码返回 PID 列表,以防文件仍由进程打开/使用(包括您自己,如果您有文件的打开句柄):

import ctypes
from ctypes import wintypes

path = r"C:\temp\test.txt"

# -----------------------------------------------------------------------------
# generic strings and constants
# -----------------------------------------------------------------------------

ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)

NTSTATUS = wintypes.LONG

INVALID_HANDLE_VALUE = wintypes.HANDLE(-1).value
FILE_READ_ATTRIBUTES = 0x80
FILE_SHARE_READ = 1
OPEN_EXISTING = 3
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000

FILE_INFORMATION_CLASS = wintypes.ULONG
FileProcessIdsUsingFileInformation = 47

LPSECURITY_ATTRIBUTES = wintypes.LPVOID
ULONG_PTR = wintypes.WPARAM


# -----------------------------------------------------------------------------
# create handle on concerned file with dwDesiredAccess == FILE_READ_ATTRIBUTES
# -----------------------------------------------------------------------------

kernel32.CreateFileW.restype = wintypes.HANDLE
kernel32.CreateFileW.argtypes = (
    wintypes.LPCWSTR,      # In     lpFileName
    wintypes.DWORD,        # In     dwDesiredAccess
    wintypes.DWORD,        # In     dwShareMode
    LPSECURITY_ATTRIBUTES,  # In_opt lpSecurityAttributes
    wintypes.DWORD,        # In     dwCreationDisposition
    wintypes.DWORD,        # In     dwFlagsAndAttributes
    wintypes.HANDLE)       # In_opt hTemplateFile
hFile = kernel32.CreateFileW(
    path, FILE_READ_ATTRIBUTES, FILE_SHARE_READ, None, OPEN_EXISTING,
    FILE_FLAG_BACKUP_SEMANTICS, None)
if hFile == INVALID_HANDLE_VALUE:
    raise ctypes.WinError(ctypes.get_last_error())


# -----------------------------------------------------------------------------
# prepare data types for system call
# -----------------------------------------------------------------------------

class IO_STATUS_BLOCK(ctypes.Structure):
    class _STATUS(ctypes.Union):
        _fields_ = (('Status', NTSTATUS),
                    ('Pointer', wintypes.LPVOID))
    _anonymous_ = '_Status',
    _fields_ = (('_Status', _STATUS),
                ('Information', ULONG_PTR))


iosb = IO_STATUS_BLOCK()


class FILE_PROCESS_IDS_USING_FILE_INFORMATION(ctypes.Structure):
    _fields_ = (('NumberOfProcessIdsInList', wintypes.LARGE_INTEGER),
                ('ProcessIdList', wintypes.LARGE_INTEGER * 64))


info = FILE_PROCESS_IDS_USING_FILE_INFORMATION()

PIO_STATUS_BLOCK = ctypes.POINTER(IO_STATUS_BLOCK)
ntdll.NtQueryInformationFile.restype = NTSTATUS
ntdll.NtQueryInformationFile.argtypes = (
    wintypes.HANDLE,        # In  FileHandle
    PIO_STATUS_BLOCK,       # Out IoStatusBlock
    wintypes.LPVOID,        # Out FileInformation
    wintypes.ULONG,         # In  Length
    FILE_INFORMATION_CLASS)  # In  FileInformationClass

# -----------------------------------------------------------------------------
# system call to retrieve list of PIDs currently using the file
# -----------------------------------------------------------------------------
status = ntdll.NtQueryInformationFile(hFile, ctypes.byref(iosb),
                                      ctypes.byref(info),
                                      ctypes.sizeof(info),
                                      FileProcessIdsUsingFileInformation)
pidList = info.ProcessIdList[0:info.NumberOfProcessIdsInList]
print(pidList)

0

我提供了一种解决方案。请看以下代码。

def isFileinUsed(ifile):
    widlcard = "/proc/*/fd/*"
    lfds = glob.glob(widlcard)
    for fds in lfds:
        try:
            file = os.readlink(fds)
            if file == ifile:
                return True            
        except OSError as err:
            if err.errno == 2:     
                file = None
            else:
                raise(err)
    return False

你可以使用这个函数来检查文件是否正在被使用。
注意:这个解决方案只能在Linux系统中使用。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接