如何在Python中异步处理XML?

7

我有一个大型的XML数据文件(>160M)要处理,似乎使用SAX / expat / pulldom解析是正确的方式。 我想要一个线程来筛选节点并将待处理节点推送到队列中,然后其他工作线程从队列中取出下一个可用节点并处理。

我有以下代码(我知道应该加锁 - 以后会添加)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

问题在于while块的主体仅调用一次,然后我甚至无法使用ctrl-C中断它。在较小的文件上,输出结果符合预期,但这似乎表明处理程序仅在完全解析文档时才被调用,这似乎违背了SAX解析器的目的。

我确定这是我的无知,但我不知道我犯了什么错误。

PS:我还尝试通过更改start_handler来解决问题:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

没有爱情,只有技术。
4个回答

8
我对这个问题不是很确定。我猜测调用ParseFile是阻塞的,只有解析线程由于GIL而被运行。解决这个问题的方法是使用multiprocessing。无论如何,它都是与队列一起工作的。
您可以创建一个Process并传递一个Queue
import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

我已经包含了一个元素列表,只是为了复制你的原始脚本。你最终的解决方案可能会使用get_nowaitPool或类似的东西。

1
是的,这是一个不错的选择 - 就像你说的,你想要使用队列。 - Bandi-T
我尝试了那段代码,它避免了死锁,但是ParseFile似乎仍然不会输出任何内容,直到读取了整个输入。 - decitrig

8

ParseFile,正如你所注意到的那样,它只是“一口气”吞下所有内容--对于你想要进行的增量解析没有好处! 因此,只需逐步将文件馈送给解析器,确保在进行操作时有条件地将控制权移交给其他线程--例如:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)
time.sleep(0.0)调用是Python的一种方式,表示“如果有其他线程准备好并等待,请让出执行权”;Parse方法在此处文档中有说明。
第二点是,对于这种情况,请忘记锁!--改用Queue.Queue,它本质上是线程安全的,几乎总是协调Python中多个线程的最佳和最简单的方法。只需创建一个Queue实例q,将其放入q.put(name),并使工作线程阻塞在q.get()上等待获取更多工作 -- 如此简单!
(当没有更多工作需要完成时,您可以使用几种辅助策略来协调工作线程的终止,但是在没有特殊要求的情况下,最简单的方法是将它们设置为守护线程,因此它们将在主线程终止时全部终止 -- 请参见文档)。

对于队列建议投赞成票,但是您确定ParseFile会一次性将所有内容都读入内存吗?它确实会回调Python处理程序以便在解析时处理标签,这就是SAX解析的全部目的...或者您是在说这还不足以触发Python中的线程切换? - Bandi-T
1
如果您想使用SAX,可以使用xml.sax,请参见http://docs.python.org/library/xml.sax.html?highlight=sax#module-xml.sax;OP没有使用SAX,而是使用xml.parsers.expat,这是一个较低抽象接口,不会强制实施增量策略(它支持它,但不会强制实施,让Python代码级别自行选择)。 - Alex Martelli
选择使用expat有些随意,我找不到一个好的解释来说明expat和sax之间的区别。sax模块同样有效 - 甚至可能更好,因为它似乎是我需要的异步的。无论如何,我最终采用了“一次喂入一块”的方法,因为这给了我一个机会在解析器处理字符串之前对其进行消毒。非常有帮助的答案,谢谢。 - decitrig
@slide_rule,不用谢 - 是的,SAX 强制使用异步(事件驱动)方式,并允许您使用不同的底层解析器(expat 是 Python 自带的一个,但您可以安装其他解析器,例如那些根据 XML Schema 进行验证等)。 - Alex Martelli

1
我唯一看到的问题是你正在同时从不同的线程访问q - 没有锁定,正如你所写的那样。这是在寻求麻烦 - 你可能会遇到Python解释器锁定的问题。 :)
尝试锁定,这真的不是很困难:
import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

你看,这很简单,我们只需创建一个锁变量来保护我们的对象,在每次使用对象之前获取该锁,并在完成对象上的任务后每次释放。这样我们就保证了q.append(name)永远不会与print(q)重叠。
(在较新版本的Python中,还有一种“with...”语法,可以帮助您不释放锁定或关闭文件或其他经常被遗忘的清理工作。)

0

关于实现细节我并不是很清楚,但如果解析器是执行 C 代码直到完成的话,其他 Python 线程将无法运行。如果解析器在回调 Python 代码,GIL 可能会被释放以便其他线程运行,但我对此并不确定。你可能需要检查这些细节。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接