你可以尝试使用带有
r
标志的
lockfile-create命令,重试指定次数以捕获
CalledProcessError
并退出,
-p
标志将存储进程的
pid
:
import os
import sys
from time import sleep
from subprocess import check_call, CalledProcessError
try:
check_call(["lockfile-create", "-q","-p", "-r", "0", "-l", "my.lock"])
except CalledProcessError as e:
print("{} is already running".format(sys.argv[0]))
print(e.returncode)
exit(1)
for i in range(10):
sleep(2)
print(1)
check_call(["rm","-f","my.lock"])
运行上述代码中的
test.py
脚本时,如果已经有一个脚本在运行,则会输出以下内容:
$ python lock.py
lock.py is already running
4
选项
-q, --quiet
抑制输出,仅通过退出状态表示成功或失败。
-v, --verbose
启用诊断输出。
-l, --lock-name
不要在文件名后添加.lock。此选项适用于lockfile-create、lockfile-remove、lockfile-touch或lockfile-check。
-p, --use-pid
每当创建锁定文件时,将当前进程ID(PID)写入锁定文件,并在检查锁定的有效性时使用该pid。有关更多信息,请参见lockfile_create(3)手册页。此选项适用于lockfile-create、lockfile-remove、lockfile-touch和lockfile-check。
-o, --oneshot
触摸锁并立即退出。此选项适用于lockfile-touch和mail-touchlock。如果未提供这些命令将永远运行,每分钟触摸一次锁定,直到被终止。
-r retry-count, --retry retry-count
尝试在放弃之前锁定filename retry-count 次。每次尝试都会延迟比上一次长一些时间(以5秒的增量),直到重试之间的最大延迟达到1分钟为止。如果未指定retry-count,则默认值为9,如果所有9个锁定尝试失败,则在180秒(3分钟)后放弃。
描述
lockfile_create函数以NFS安全的方式创建锁定文件。
如果flags设置为L_PID,则lockfile_create不仅会检查现有的锁定文件,而且还会读取其内容以查看是否包含ASCII进程ID。如果是,则仅当该进程仍然存在时,锁定文件才有效。
如果锁定文件在共享文件系统上,则可能是由远程主机上的进程创建的。因此,进程ID检查是无用的,不应设置L_PID标志。在这种情况下,没有好方法可以确定锁定文件是否过期。因此,如果锁定文件旧于5分钟,则将其删除。这就是为什么提供了lockfile_touch函数:在保持锁定的同时,需要通过调用lockfile_touch()定期刷新它(每隔一分钟左右)。
lockfile_check函数在不尝试创建新的锁定文件的情况下检查是否已存在有效的锁定文件。
最后,lockfile_remove函数删除锁定文件。
算法
用于以原子方式创建锁定文件(甚至在NFS上)的算法如下:
1
创建一个唯一的文件。在printf格式中,文件名为.lk%05d%x%s。第一个参数(%05d)是当前进程ID。第二个参数(%x)由time(2)返回值的4位副本组成。最后一个参数是系统主机名。
2
然后使用link(2)创建锁定文件。忽略link的返回值。
3
现在stat()锁定文件。如果stat失败,则跳转到步骤6。
4
将锁定文件的stat值与临时文件的stat值进行比较。如果它们相同,则获得了锁。删除临时文件并向调用者返回0(成功)。
5
检查现有的锁定文件是否有效。如果无效,则删除旧的锁定文件。
6
重试之前,我们暂停n秒。 n最初为5秒,但每次重试都会增加5秒,最多增加60秒(递增退避)。然后我们继续执行步骤2,直到达到重试次数。
似乎在Redhat上有一个称为lockfile-progs的等效软件包。lockfile-progs。
在Mac上,您可以使用lockfile并执行以下操作:
import os
import sys
from time import sleep
import os
from subprocess import Popen, CalledProcessError, check_call
p = Popen(["lockfile", "-r", "0", "my.lock"])
p.wait()
if p.returncode == 0:
with open("my.pid", "w") as f:
f.write(str(os.getpid()))
else:
try:
with open("my.pid") as f:
r = f.read()
check_call(["kill", "-0", "{}".format(r)])
except CalledProcessError:
check_call(["rm", "-f", "my.lock"])
check_call(["lockfile", "-r", "0", "my.lock"])
with open("my.pid", "w") as out:
out.write(str(os.getpid()))
print("Deleted stale lockfile.")
else:
print("{} is already running".format(sys.argv[0]))
print(p.returncode)
exit(1)
for i in range(10):
sleep(1)
print(1)
check_call(["rm", "-f", "my.lock"])
在您的情况下,也许使用套接字会起作用:
from socket import socket, gethostname, error, SO_REUSEADDR, SOL_SOCKET
from sys import argv
import errno
sock = socket()
host = gethostname()
port = 60001
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
try:
sock.bind((host, port))
sock.connect((host, port))
except error as e:
if e.errno == errno.EADDRNOTAVAIL:
print("{} is already running".format(argv[0]))
exit(1)
else:
raise e
from time import sleep
while True:
print(1)
sleep(2)
sock.close()