如何使用 Python 编写程序计算档案中文件的数量

15

我维护的程序中,它的做法如下:

# count the files in the archive
length = 0
command = ur'"%s" l -slt "%s"' % (u'path/to/7z.exe', srcFile)
ins, err = Popen(command, stdout=PIPE, stdin=PIPE,
                 startupinfo=startupinfo).communicate()
ins = StringIO.StringIO(ins)
for line in ins: length += 1
ins.close()
  1. 这是真的唯一的方法吗?我似乎找不到其他命令,但我觉得只询问文件数量好像有点奇怪。
  2. 错误检查呢?将其修改为以下内容是否足够:

    proc = Popen(command, stdout=PIPE, stdin=PIPE,
                 startupinfo=startupinfo)
    out = proc.stdout
    # ... count
    returncode = proc.wait()
    if returncode:
        raise Exception(u'Failed reading number of files from ' + srcFile)
    

    我应该解析Popen的输出吗?

  3. 编辑:对7z、rar、zip档案感兴趣(这些都被7z.exe支持)- 但对于入门者来说,7z和zip就足够了。


1
你应该支持哪种类型的归档? - Loïc Faure-Lacroix
2
有关zip和tar,请查看https://docs.python.org/2/library/zipfile.html和https://docs.python.org/2/library/tarfile.html。 - Loïc Faure-Lacroix
@LoïcFaure-Lacroix:谢谢-已编辑。我肯定需要7z... - Mr_and_Mrs_D
也许可以看看这个? https://github.com/fancycode/pylzma/blob/master/py7zlib.pypy7zlib 应该能够读取存档。之后,您可以使用类似于 zipfile 或 tarfile 的东西来提取内部的名称(py7zlib.Archive7z.getnames)。 - Alex Huszagh
2个回答

16

在Python中计算zip归档文件中归档成员的数量:

#!/usr/bin/env python
import sys
from contextlib import closing
from zipfile import ZipFile

with closing(ZipFile(sys.argv[1])) as archive:
    count = len(archive.infolist())
print(count)

如果可用的话,它可能会使用zlibbz2lzma模块来解压缩存档。


要计算tar存档中常规文件的数量:

#!/usr/bin/env python
import sys
import tarfile

with tarfile.open(sys.argv[1]) as archive:
    count = sum(1 for member in archive if member.isreg())
print(count)

根据Python版本的不同,它可能支持gzipbz2lzma压缩。

您可以找到第三方模块来提供类似于7z归档的功能。


使用7z工具获取存档文件中的文件数:

import os
import subprocess

def count_files_7z(archive):
    s = subprocess.check_output(["7z", "l", archive], env=dict(os.environ, LC_ALL="C"))
    return int(re.search(br'(\d+)\s+files,\s+\d+\s+folders$', s).group(1))

以下是一个版本,如果存档中有很多文件,可能会使用更少的内存:

import os
import re
from subprocess import Popen, PIPE, CalledProcessError

def count_files_7z(archive):
    command = ["7z", "l", archive]
    p = Popen(command, stdout=PIPE, bufsize=1, env=dict(os.environ, LC_ALL="C"))
    with p.stdout:
        for line in p.stdout:
            if line.startswith(b'Error:'): # found error
                error = line + b"".join(p.stdout)
                raise CalledProcessError(p.wait(), command, error)
    returncode = p.wait()
    assert returncode == 0
    return int(re.search(br'(\d+)\s+files,\s+\d+\s+folders', line).group(1))

例子:

import sys

try:
    print(count_files_7z(sys.argv[1]))
except CalledProcessError as e:
    getattr(sys.stderr, 'buffer', sys.stderr).write(e.output)
    sys.exit(e.returncode)
为了计算通用子进程输出中的行数:
from functools import partial
from subprocess import Popen, PIPE, CalledProcessError

p = Popen(command, stdout=PIPE, bufsize=-1)
with p.stdout:
    read_chunk = partial(p.stdout.read, 1 << 15)
    count = sum(chunk.count(b'\n') for chunk in iter(read_chunk, b''))
if p.wait() != 0:
    raise CalledProcessError(p.returncode, command)
print(count)

它支持无限的输出。


你可以解释一下为什么buffsize = -1(而不是在您以前的答案stackoverflow.com/a/30984882/281545中为buffsize = 1)吗?

bufsize = -1 意味着在Python 2上使用默认的I/O缓冲区大小,而不是bufsize = 0(无缓冲)。它是Python 2上的性能提升。它是最近的Python 3版本的默认值。如果将bufsize未更改为bufsize = -1,则可能会出现短读取(丢失数据)的情况,这在一些早期的Python 3版本中是存在的。

此答案分块读取数据,因此流完全缓冲以提高效率。您链接的解决方案 是面向行的,bufsize = 1意味着它是“行缓冲”。否则与bufsize = -1相比几乎没有区别。

那么,read_chunk = partial(p.stdout.read, 1 << 15) 对我们来说有什么作用?

它相当于 read_chunk = lambda: p.stdout.read(1<<15),但通常提供更多的内省信息。它用于有效实现Python中的“wc -l”


嘿,谢谢!你能解释一下为什么buffsize=-1(而不是你之前回答中的buffsize=1:https://dev59.com/ZF0Z5IYBdhLWcg3w3Dbk#30984882),还有`read_chunk = partial(p.stdout.read, 1 << 15)是什么意思吗?实际上,这个buffsize对我来说是个谜(也是我在谷歌上尝试过的)。与此同时,由于我已经捆绑了7z.exe`(我想要显示确切的错误信息),所以我认为我会采用我的答案(除非我做了什么明显愚蠢的事情)。 - Mr_and_Mrs_D
@Mr_and_Mrs_D:我已经添加了代码示例,展示如何使用7z工具获取文件数量,并在必要时收集错误消息。 - jfs
优秀(这就是Hettinger在视频中所说的大猩猩 - 我一直在使用正则表达式进行匹配,而不是简单地解析最后一行 - 当然我想到了,但忙于我的正则表达式)。我本来打算使用我的版本(没有时间测试),但_我简直无法抗拒正确性_ - 将使用错误检查版本(没有时间检查check_output) - 最后一个问题 - 我需要-scsUTF-8 -sccUTF-8并使用u''还是应该按原样处理?快速测试表明存档中的Unicode名称没有区别,但仍然... - Mr_and_Mrs_D
1
@Mr_and_Mrs_D:所有代码都应该按原样工作,即不需要“-scsUTF-8 -sccUTF-8”。注意:基于check_output()的版本可能比使用Popen()count_files_7z()使用更多的内存,但错误处理是相同的--您可以在两个count_files_7z()实现中运行示例--尽管第二个变体不会存储输出,直到遇到错误(这就是为什么它使用更少的内存)。 - jfs
1
@Mr_and_Mrs_D:否则,您可能会收到另一种语言(取决于您的区域设置)的消息,并且使用英语单词“files”、“folders”的正则表达式可能会失败。 - jfs
显示剩余3条评论

1

因为我已经将7z.exe捆绑到应用程序中,而且我肯定想避免使用第三方库,但我确实需要解析rar和7z存档,所以我会选择:

regErrMatch = re.compile(u'Error:', re.U).match # needs more testing
r"""7z list command output is of the form:
   Date      Time    Attr         Size   Compressed  Name
------------------- ----- ------------ ------------  ------------------------
2015-06-29 21:14:04 ....A       <size>               <filename>
where ....A is the attribute value for normal files, ....D for directories
"""
reFileMatch = re.compile(ur'(\d|:|-|\s)*\.\.\.\.A', re.U).match

def countFilesInArchive(srcArch, listFilePath=None):
    """Count all regular files in srcArch (or only the subset in
    listFilePath)."""
    # https://dev59.com/510Z5IYBdhLWcg3wvCWd
    command = ur'"%s" l -scsUTF-8 -sccUTF-8 "%s"' % ('compiled/7z.exe', srcArch)
    if listFilePath: command += u' @"%s"' % listFilePath
    proc = Popen(command, stdout=PIPE, startupinfo=startupinfo, bufsize=-1)
    length, errorLine = 0, []
    with proc.stdout as out:
        for line in iter(out.readline, b''):
            line = unicode(line, 'utf8')
            if errorLine or regErrMatch(line):
                errorLine.append(line)
            elif reFileMatch(line):
                length += 1
    returncode = proc.wait()
    if returncode or errorLine: raise StateError(u'%s: Listing failed\n' + 
        srcArch + u'7z.exe return value: ' + str(returncode) +
        u'\n' + u'\n'.join([x.strip() for x in errorLine if x.strip()]))
    return length

Python Popen - wait vs communicate vs CalledProcessError 中,@JFSebastien 提到的错误检查


基于已接受的答案,我的最终版本(大致)- 可能不需要Unicode,但目前仍然保留它,因为我在任何地方都使用它。同样保留了正则表达式(我可能会扩展它,我见过像 re.compile(u'^(Error:.+|.+ Data Error?|Sub items Errors:.+)',re.U) 这样的东西)。将不得不研究check_output和CalledProcessError。

def countFilesInArchive(srcArch, listFilePath=None):
    """Count all regular files in srcArch (or only the subset in
    listFilePath)."""
    command = [exe7z, u'l', u'-scsUTF-8', u'-sccUTF-8', srcArch]
    if listFilePath: command += [u'@%s' % listFilePath]
    proc = Popen(command, stdout=PIPE, stdin=PIPE, # stdin needed if listFilePath
                 startupinfo=startupinfo, bufsize=1)
    errorLine = line = u''
    with proc.stdout as out:
        for line in iter(out.readline, b''): # consider io.TextIOWrapper
            line = unicode(line, 'utf8')
            if regErrMatch(line):
                errorLine = line + u''.join(out)
                break
    returncode = proc.wait()
    msg = u'%s: Listing failed\n' % srcArch.s
    if returncode or errorLine:
        msg += u'7z.exe return value: ' + str(returncode) + u'\n' + errorLine
    elif not line: # should not happen
        msg += u'Empty output'
    else: msg = u''
    if msg: raise StateError(msg) # consider using CalledProcessError
    # number of files is reported in the last line - example:
    #                                3534900       325332  75 files, 29 folders
    return int(re.search(ur'(\d+)\s+files,\s+\d+\s+folders', line).group(1))

会编辑这个并记录我的发现。

1
你可以在这里使用for line in out:或更好的是for line in io.TextIOWrapper(out, encoding='utf-8'):(将字节解码为Unicode并启用通用换行模式)。不要使用if len(container),而应改用if container(Python中空容器为False)。可以使用line.startswith('Error:')代替regErrMatch正则表达式。你确定7z将其错误打印到标准输出吗(这很不幸)?请遵循PEP-8命名约定,除非你有特殊原因不这样做。 - jfs
是的,7z将其输出打印在stdout(...)- TextIOWrapper中。我会看一下。regErrMatch:我可能需要详细说明有关错误的正则表达式。PEP8-这是遗留代码,慢慢地PEP8'ing它(另请参见:https://www.youtube.com/watch?v=wf-BqAjZb8M-尽管79个字符,但我完全同意)。 - Mr_and_Mrs_D

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接