使用 Python 高效地合并大型 XML 文件

4

我有大约200个大小从5MB到50MB不等的XML文件,其中80%的文件大小小于10MB。这些文件包含多个元素,既有重叠的数据,也有独特的数据。我的目标是通过在所有元素上执行逻辑联合来合并所有这些文件。

代码似乎可以工作,但处理更多文件时速度呈指数级下降。例如,处理前5个文件需要约20秒,接下来的五个文件需要约1分钟,然后是约5分钟的下一个五个文件,以此类推,同时消耗的内存明显比所有文件的总和还要多。随着我输入此内容,整个过程已经运行了4个小时。

显然,这是一种“预期”的效应,考虑到需要在越来越大的树上进行查找。不过,我想知道是否有方法至少减轻这种影响。

我尝试实现了一种简单缓存的形式,但我没有注意到任何显着的改进。

我还尝试了多进程,它确实有所帮助,但增加了额外的复杂性,并将问题推向了硬件水平,这并不感觉非常优化。

是否有什么方法可以提高性能?

注:由于保密原因,我不得不混淆代码和数据的某些部分。请不要犹豫告知,如果它违反了示例

代码:

import lxml.etree
from lxml.etree import Element

# Edit2: added timing prints

def process_elements(files: list[str],
                     indentifier: int) -> lxml.etree._Element | None:

    base_el = Element('BASE')
    i = 0
    cache = {}  # Edit1. Missed this line

    
    start = time.time()
    time_spent_reading = 0
    lookup_time = [0, 0]
    append_new_el_time = [0, ]
    cache_search_time = [0, 0]
    recursive_calls_counter = [0, ]


    for file in files:
        i += 1
        print(f"Process: {indentifier}, File {i} of {len(files)}: {file}")
        print("Reading file...")
        start_read = time.time()

        tree = lxml.etree.parse(f'data/{file}').getroot()
        print(f"Reading file took {time.time() - start_read} seconds")
        print("Since start: ", time.time() - start)

        packages = tree.find('BASE')
        

        print("Starting walk...")
        sart_walked = time.time()
        for package in packages:
            walk(package, base_el, cache, lookup_time,
                 append_new_el_time, cache_search_time)
            
        print(f"Walk took {time.time() - sart_walked} seconds")
        print("Since start: ", time.time() - start)

    if indentifier == -1:
        return base_el
    else:
        print("Timing results:")
        print("Time spent reading: ", time_spent_reading)
        print("Time spent on lookup: ", lookup_time[0])
        print("Time spent on append: ", append_new_el_time[0])
        print("Time spent on cache search: ", cache_search_time[0])

        base_el.getroottree().write(
            f'temp{indentifier}.xml', encoding='utf-8')
        return None
    

def walk(element: lxml.etree._Element,
         reference: lxml.etree._Element,
         cache: dictlookup_time,
         append_new_el_time,
         cache_search_time,
         recursive_calls_counter) -> None:

    recursive_calls_counter[0] += 1

    children = element.iterchildren()
    elid = f"{element.tag}"
    element_name = element.get('some-id-i-need')
    if element_name is not None:
        elid += f'[@some-id-i-need="{element_name}"]'

    cache_id = str(id(reference)) + "_" + elid

    cache_search_time_start = time.time()
    relevant_data = cache.get(cache_id)
    cache_search_time[0] += time.time() - cache_search_time_start

    # if element is found either in cache or in the new merged object
    # continue to its children
    # otherwise, element does not exist in merged object. 
    # Add it to the merged object and to cache

    if relevant_data is None:
        # I believe this lookup may be what takes the most time
        # hence my attempt to cache this
        lookup_time_start = time.time()
        relevant_data = reference.find(elid)   
        lookup_time[0] += time.time() - lookup_time_start
        lookup_time[1] += 1
    else:
        # cache hit
        cache_search_time[1] += 1

    if relevant_data is None:
        append_new_el_time_start = time.time()
        reference.append(element)
        append_new_el_time[0] += time.time() - append_new_el_time_start
        return

    else:
        cache.setdefault(cache_id, relevant_data)
        # if element has no children, loop will not run
        for child in children:
            walk(child, relevant_data, cache, lookup_time,
                 append_new_el_time,
                 cache_search_time,
                 recursive_calls_counter)


# to run: process_elements(os.listdir("data"), -1)

示例数据:

文件1

<BASE>
    <elem id="1">
        <data-tag id="1">
            <object id="23124">
                <POS Tag="V" />
                <grammar type="STEM" />
                <Aspect type="IMPV" />
                <Number type="S" />
            </object>
            <object id="128161">
                <POS Tag="V" />
                <grammar type="STEM" />
                <Aspect type="IMPF" />
            </object>
        </data-tag>

    </elem>
</BASE>

文件2

<BASE>
    <elem id="1">
        <data-tag id="1">
            <object id="23124">

                <concept type="t1" />
            </object>
            <object id="128161">

                <concept type="t2" />
            </object>
        </data-tag>
        <data-tag id="2">
            <object id="128162">
                <POS Tag="P" />
                <grammar type="PREFIX" />
                <Tag Tag="bi+" />
                <concept type="t3" />
            </object>
        </data-tag>
    </elem>
</BASE>

结果:

<BASE>
    <elem id="1">
        <data-tag id="1">
            <object id="23124">
                <POS Tag="V" />
                <grammar type="STEM" />
                <Aspect type="IMPV" />
                <Number type="S" />
                <concept type="t1" />
            </object>
            <object id="128161">
                <POS Tag="V" />
                <grammar type="STEM" />
                <Aspect type="IMPF" />
                <concept type="t2" />
            </object>
        </data-tag>
        <data-tag id="2">
            <object id="128162">
                <POS Tag="P" />
                <grammar type="PREFIX" />
                <Tag Tag="bi+" />
                <concept type="t3" />
            </object>
        </data-tag>
    </elem>
</BASE>

编辑2:处理10个文件后的时间结果(约60MB,1分24.8秒):


Starting process...
Process: 102, File 1 of 10:
Reading file...
Reading file took 0.1326887607574463 seconds
Since start:  0.1326887607574463
preprocesing...
merging...
Starting walk...
Walk took 0.8433401584625244 seconds
Since start:  1.0600318908691406
Process: 102, File 2 of 10:
Reading file...
Reading file took 0.04700827598571777 seconds
Since start:  1.1070401668548584
preprocesing...
merging...
Starting walk...
Walk took 1.733034610748291 seconds
Since start:  2.8680694103240967
Process: 102, File 3 of 10:
Reading file...
Reading file took 0.041702985763549805 seconds
Since start:  2.9097723960876465
preprocesing...
merging...
...
Time spent on lookup:  79.53011083602905
Time spent on append:  1.1502337455749512
Time spent on cache search:  0.11017322540283203
Cache size:  30176

# Edit3: extra data
Number of cache hits:  112503
Cache size:  30177
Number of recursive calls:  168063


作为一项观察,我确实预计文件之间会有重叠,也许小缓存搜索时间表明我实现缓存的方式存在问题?

编辑3:看起来我确实得到了很多命中。但奇怪的是,如果我注释掉缓存搜索部分,对性能几乎没有影响。事实上,它在没有缓存的情况下运行得更快(虽然不确定几秒钟是否是显著差异或仅仅是这种情况下的随机机会)。

relevant_data = None  # cache.get(cache_id)

已注释掉缓存的日志:

Time spent on lookup:  71.13456320762634
Number of lookups:  168063
Time spent on append:  3.9656710624694824
Time spent on cache search:  0.020023584365844727
Number of cache hits:  0
Cache size:  30177
Number of recursive calls:  168063

你找到是什么在占用所有时间了吗?你可以添加打印语句或记录,显示当前时间或自上次打印语句以来的时间段。可能是将其写回 XML 并再次读取导致速度变慢。 - Paul Rooney
@PaulRooney 在运行了几次之后编辑了问题以显示时间日志。 - Petru Tanas
我刚听说过 https://github.com/plasma-umass/scalene 用于代码分析,可能有助于找出热点。 - Sam Mason
1个回答

1

在处理过程中缓存所有标识符似乎效果很好,并且随着添加更多数据而不会显著减慢速度。

以下代码可以实现此功能:

def xml_union(files, loader):
    existing = {}
    path = []

    def populatewalk(elem):
        pid = elem.get('id')
        ident = (elem.tag, pid)
        path.append(ident)
        if pid is not None:
            existing[tuple(path)] = elem
        for child in elem:
            populatewalk(child)
        popped = path.pop()
        assert popped is ident

    def walk(existing_parent, elem):
        pid = elem.get('id')
        
        if pid is None:
            existing_parent.append(elem)
            # make sure children are populated
            return populatewalk(elem)

        ident = (elem.tag, pid)
        path.append(ident)
        tpath = tuple(path)
        existing_elem = existing.get(tpath)
        if existing_elem is None:
            existing_parent.append(elem)
            existing[tpath] = elem
            for child in elem:
                populatewalk(child)
        else:
            existing_elem.attrib.update(elem.items())
            for child in elem:
                walk(existing_elem, child)

        popped = path.pop()
        assert popped is ident        

    first, *remain = files
    root = loader(first)
    for elem in root:
        populatewalk(elem)

    for text in remain:
        ri = loader(text)
        if root.tag != ri.tag:
            raise ValueError(f"root tag {root.tag!r} does not equal {ri.tag!r}")
        for elem in ri:
            walk(root, elem)
    
    return root

上述代码假定您始终希望使用id属性来标识元素,但更改应该很容易。它还稍微更通用,因为在执行联合时会跟踪元素层次结构,而您的代码似乎只关心是否可以找到具有给定ID的元素。不确定这是否重要!

可以使用以下行进行测试,其中f1f2设置为包含上面发送的测试的字符串。

print(etree.tostring(xml_union([f1, f2], etree.fromstring)).decode())

写这个并不需要太长时间,但是让自己相信它是正确和高效的花费了更长的时间。最终我编写了一个测试工具,生成10个大小约为12MiB的文件,对这些文件运行上述代码,然后将结果写入一个大小约为87MiB的文件,并确保该文件恰好是所生成内容的并集。使用xml_union的部分如下:
from time import time

def fileloader(path):
    print(f"loading {path}")
    return etree.parse(path).getroot()

t0 = time()
new_root = xml_union(
    [f'large-{i:02}.xml' for i in range(10)],
    fileloader,
)
t1 = time()
with open(f'merged.xml', 'wb') as fd:
    print("writing merged")
    etree.ElementTree(new_root).write(fd, pretty_print=True)
t2 = time()

print(f"union={t1-t0:.2f} write={t2-t1:.2f}")

我的1.6GHz笔记本电脑需要大约23秒来合并这些文件,后续文件没有观察到任何减速。将结果对象写入Python需要约2秒钟。

测试工具更加琐碎,看起来像:

from itertools import product
from random import choices

def randomizer():
    num = 1
    def next(n, rb):
        nonlocal num
        for _ in range(n):
            yield num, choices(rb, k=len(terminals))
            num += 1
    return next

rootids = list(range(10))
roots = [etree.Element('BASE') for _ in rootids]
obj_elems = {}
dt_elems = {}
el_elems = {}

def get_obj(root_id, el_id, dt_id, obj_id):
    obj = obj_elems.get((root_id, obj_id))
    if obj is not None:
        return obj

    obj = obj_elems[(root_id, obj_id)] = etree.Element('object', id=str(obj_id))
    dt = dt_elems.get((root_id, dt_id))
    if dt is not None:
        dt.append(obj)
        return obj

    dt = dt_elems[(root_id, dt_id)] = etree.Element('data-tag', id=str(dt_id))
    dt.append(obj)
    el = el_elems.get((root_id, el_id))
    if el is not None:
        el.append(dt)
        return obj

    el = el_elems[(root_id, el_id)] = etree.Element('elem', id=str(el_id))
    el.append(dt)
    roots[root_id].append(el)
    return obj

elmaker = randomizer()
dtmaker = randomizer()
objmaker = randomizer()

for el_id, el_roots in elmaker(1000, rootids):
    for dt_id, dt_roots in dtmaker(100, el_roots):
        for obj_id, obj_roots in objmaker(len(terminals), dt_roots):
            for key, root_id in zip(terminals, obj_roots):
                get_obj(root_id, el_id, dt_id, obj_id).append(
                    etree.Element(key, an='val')
                )

for root_id, root in zip(rootids, roots):
    with open(f'large-{root_id:02}.xml', 'wb') as fd:
        et = etree.ElementTree(root)
        et.write(fd, pretty_print=True)

nelem = 1000
ndt = 100
nterm = len(terminals)

expected_elems = set(str(i+1) for i in range(nelem))
expected_dts = set(str(i+1) for i in range(nelem*ndt))
expected_objs = set(str(i+1) for i in range(nelem*ndt*nterm))
expected_terms = set(product(expected_objs, terminals))

elem_seen = set()
dt_seen = set()
obj_seen = set()
terms_seen = set()

def check(el, tag, seen):
    assert el.tag == tag
    aid = el.attrib['id']
    assert aid not in seen
    seen.add(aid)
    return aid

for elem in etree.parse('merged.xml').getroot():
    check(elem, 'elem', elem_seen)
    for dt in elem:
        check(dt, 'data-tag', dt_seen)
        for obj in dt:
            obj_id = check(obj, 'object', obj_seen)
            for term in obj:
                assert term.tag in terminals
                term_id = (obj_id, term.tag)
                assert term_id not in terms_seen
                terms_seen.add(term_id)

assert elem_seen == expected_elems
assert dt_seen == expected_dts
assert obj_seen == expected_objs
assert terms_seen == expected_terms

希望这个测试工具对其他人有用!


只是为了澄清,“代码假定您始终希望使用id属性来标识元素”->它是基于所有元素都有id的假设构建的吗? - Petru Tanas
代码并不是很长,应该很容易看出来它不是!此外,测试代码生成的数据并不总是具有“id”。 - Sam Mason

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接