从多个txt文件中解析文本的Python代码

15

寻求如何从多个文本文件中提取项目以构建字典的建议。

这个文本文件:https://pastebin.com/Npcp3HCM

被手工转换成了所需的数据结构:https://drive.google.com/file/d/0B2AJ7rliSQubV0J2Z0d0eXF3bW8/view

有成千上万个这样的文本文件,它们可能具有不同的章节标题,如这些示例所示:

  1. https://pastebin.com/wWSPGaLX
  2. https://pastebin.com/9Up4RWHu

我开始阅读这些文件。

from glob import glob

txtPth = '../tr-txt/*.txt'
txtFiles = glob(txtPth)

with open(txtFiles[0],'r') as tf:
    allLines = [line.rstrip() for line in tf]

sectionHeading = ['Corporate Participants',
                  'Conference Call Participiants',
                  'Presentation',
                  'Questions and Answers']

for lineNum, line in enumerate(allLines):
    if line in sectionHeading:
        print(lineNum,allLines[lineNum])

我的想法是查找存在标题的行号,然后尝试提取这些行号之间的内容,然后剥离分隔符(如破折号)。但这并没有起作用,我卡在了试图创建这种类型字典上,以便稍后可以对挖掘出来的项目运行各种自然语言处理算法。

{file-name1:{
    {date-time:[string]},
    {corporate-name:[string]},
    {corporate-participants:[name1,name2,name3]},
    {call-participants:[name4,name5]},
    {section-headings:{
        {heading1:[
            {name1:[speechOrderNum, text-content]},
            {name2:[speechOrderNum, text-content]},
            {name3:[speechOrderNum, text-content]}],
        {heading2:[
            {name1:[speechOrderNum, text-content]},
            {name2:[speechOrderNum, text-content]},
            {name3:[speechOrderNum, text-content]},
            {name2:[speechOrderNum, text-content]},
            {name1:[speechOrderNum, text-content]},
            {name4:[speechOrderNum, text-content]}],
        {heading3:[text-content]},
        {heading4:[text-content]}
        }
    }
}
挑战在于不同的文件可能有不同的标题和标题数量。但是总会有一个名为“演示”的部分,并且很可能有“问答”部分。这些部分标题始终由一串等于号分隔。不同发言人的内容始终由破折号字符串分隔。Q&A部分的"演讲顺序"用方括号中的数字表示。参与者在文档开头始终用他们的名字之前的星号表示,并且他们的职称始终在下一行。

欢迎提出如何解析文本文件的任何建议。理想的帮助是提供关于如何为每个文件生成此类字典(或其他适当的数据结构)的指导,然后将其写入数据库。

谢谢
--编辑--
其中一个文件如下:https://pastebin.com/MSvmHb2e
其中,“问答”部分被错误地标记为“演示”,没有其他“问答”部分。
最后一个示例文本:https://pastebin.com/jr9WfpV8

3
我不建议您将所有文本数据存储在一个单一的dict对象中,因为您提到可能有大量的文本文件需要解析,所以在运行时,由于dict对象的大小增加,Python进程将花费更多的时间来更新dict对象,并且如果要处理一些非常大的文件,则可能会出现内存不足的情况。我建议使用一些DBMS来存储这种数据。 - ZdaR
@ZdaR 谢谢你的建议。在阅读了你的评论后,我决定使用数据库。我目前正在研究SQLAlchemy。 - samkhan13
误标记不会很容易解决。您将需要使用机器学习技术构建一个分类器,将一个部分分类为“演示文稿”或“问答”部分,因为文本中没有保证的线索(使用手工制作的规则进行模式识别也无法100%正确)。 - entrophy
感谢 Stack Overflow 社区提供的答案和评论。我已经将赏金授予使用 Python 和正则表达式处理模式识别的答案。if 语句级联形成状态机可以完成给定的任务,但调整或重构使用正则表达式的代码更容易且更具通用性。 - samkhan13
2个回答

8

代码中的注释应该解释得很清楚了。如果有任何未明确说明的地方需要更多的注释,请告诉我。

简而言之,我利用正则表达式找到'='分隔符行来将整个文本细分为子部分,然后为了清晰起见分别处理每种类型的部分(这样您就可以知道我如何处理每种情况)。

附注:我交替使用“与会者”和“作者”这个词。

编辑:更新了代码,根据演示/QA节段中与会者/作者旁边发现的“[x]”模式进行排序。还更改了漂亮打印部分,因为pprint无法很好地处理OrderedDict。

要去除字符串中的任何额外空格,包括\n,只需执行str.strip()。如果您特别需要仅剥离\n,那么只需执行str.strip('\n')

我已修改代码以去除讲话中的任何空格。

import json
import re
from collections import OrderedDict
from pprint import pprint


# Subdivides a collection of lines based on the delimiting regular expression.
# >>> example_string =' =============================
#                       asdfasdfasdf
#                       sdfasdfdfsdfsdf
#                       =============================
#                       asdfsdfasdfasd
#                       =============================
# >>> subdivide(example_string, "^=+")
# >>> ['asdfasdfasdf\nsdfasdfdfsdfsdf\n', 'asdfsdfasdfasd\n']
def subdivide(lines, regex):
    equ_pattern = re.compile(regex, re.MULTILINE)
    sections = equ_pattern.split(lines)
    sections = [section.strip('\n') for section in sections]
    return sections


# for processing sections with dashes in them, returns the heading of the section along with
# a dictionary where each key is the subsection's header, and each value is the text in the subsection.
def process_dashed_sections(section):

    subsections = subdivide(section, "^-+")
    heading = subsections[0]  # header of the section.
    d = {key: value for key, value in zip(subsections[1::2], subsections[2::2])}
    index_pattern = re.compile("\[(.+)\]", re.MULTILINE)

    # sort the dictionary by first capturing the pattern '[x]' and extracting 'x' number.
    # Then this is passed as a compare function to 'sorted' to sort based on 'x'.
    def cmp(d):
        mat = index_pattern.findall(d[0])
        if mat:
            print(mat[0])
            return int(mat[0])
        # There are issues when dealing with subsections containing '-'s but not containing '[x]' pattern.
        # This is just to deal with that small issue.
        else:
            return 0

    o_d = OrderedDict(sorted(d.items(), key=cmp))
    return heading, o_d


# this is to rename the keys of 'd' dictionary to the proper names present in the attendees.
# it searches for the best match for the key in the 'attendees' list, and replaces the corresponding key.
# >>> d = {'mr. man   ceo of company   [1]' : ' This is talk a' ,
#  ...     'ms. woman  ceo of company    [2]' : ' This is talk b'}
# >>> l = ['mr. man', 'ms. woman']
# >>> new_d = assign_attendee(d, l)
# new_d = {'mr. man': 'This is talk a', 'ms. woman': 'This is talk b'}
def assign_attendee(d, attendees):
    new_d = OrderedDict()
    for key, value in d.items():
        a = [a for a in attendees if a in key]
        if len(a) == 1:
            # to strip out any additional whitespace anywhere in the text including '\n'.
            new_d[a[0]] = value.strip()
        elif len(a) == 0:
            # to strip out any additional whitespace anywhere in the text including '\n'.
            new_d[key] = value.strip()
    return new_d


if __name__ == '__main__':
    with open('input.txt', 'r') as input:
        lines = input.read()

        # regex pattern for matching headers of each section
        header_pattern = re.compile("^.*[^\n]", re.MULTILINE)

        # regex pattern for matching the sections that contains
        # the list of attendee's (those that start with asterisks )
        ppl_pattern = re.compile("^(\s+\*)(.+)(\s.*)", re.MULTILINE)

        # regex pattern for matching sections with subsections in them.
        dash_pattern = re.compile("^-+", re.MULTILINE)

        ppl_d = dict()
        talks_d = dict()

        # Step1. Divide the the entire document into sections using the '=' divider
        sections = subdivide(lines, "^=+")
        header = []
        print(sections)
        # Step2. Handle each section like a switch case
        for section in sections:

            # Handle headers
            if len(section.split('\n')) == 1:  # likely to match only a header (assuming )
                header = header_pattern.match(section).string

            # Handle attendees/authors
            elif ppl_pattern.match(section):
                ppls = ppl_pattern.findall(section)
                d = {key.strip(): value.strip() for (_, key, value) in ppls}
                ppl_d.update(d)

                # assuming that if the previous section was detected as a header, then this section will relate
                # to that header
                if header:
                    talks_d.update({header: ppl_d})

            # Handle subsections
            elif dash_pattern.findall(section):
                heading, d = process_dashed_sections(section)

                talks_d.update({heading: d})

            # Else its just some random text.
            else:

                # assuming that if the previous section was detected as a header, then this section will relate
                # to that header
                if header:
                    talks_d.update({header: section})

        #pprint(talks_d)
        # To assign the talks material to the appropriate attendee/author. Still works if no match found.
        for key, value in talks_d.items():
            talks_d[key] = assign_attendee(value, ppl_d.keys())

        # ordered dict does not pretty print using 'pprint'. So a small hack to make use of json output to pretty print.
        print(json.dumps(talks_d, indent=4))

如果您能在 talks_d 中包含演讲顺序,同时也将演讲顺序与演讲内容一起列出来,我可以接受这个答案。演讲顺序由方括号表示。如果 talks_d 是一个有序字典,那将非常有用。 - samkhan13
如何从 talks_d 的文本中删除 '\n'? - samkhan13

3

请确认您是否只需要“演示”和“问答”部分?关于输出,是否可以转储CSV格式,类似于您“手动转换”的方式。

更新的解决方案可适用于您提供的每个样本文件。

输出是根据共享的“Parsed-transcript”文件从单元格“D:H”中获取的。

#state = ["other", "head", "present", "qa", "speaker", "data"]
# codes : 0, 1, 2, 3, 4, 5
def writecell(out, data):
    out.write(data)
    out.write(",")

def readfile(fname, outname):
    initstate = 0
    f = open(fname, "r")
    out = open(outname, "w")
    head = ""
    head_written = 0
    quotes = 0
    had_speaker = 0
    for line in f:
        line = line.strip()
        if not line: continue
        if initstate in [0,5] and not any([s for s in line if "=" != s]):
            if initstate == 5:
                out.write('"')
                quotes = 0
                out.write("\n")
            initstate = 1
        elif initstate in [0,5] and not any([s for s in line if "-" != s]):
            if initstate == 5:
                out.write('"')
                quotes = 0
                out.write("\n")
                initstate = 4
        elif initstate == 1 and line == "Presentation":
            initstate = 2
            head = "Presentation"
            head_written = 0
        elif initstate == 1 and line == "Questions and Answers":
            initstate = 3
            head = "Questions and Answers"
            head_written = 0
        elif initstate == 1 and not any([s for s in line if "=" != s]):
            initstate = 0
        elif initstate in [2, 3] and not any([s for s in line if ("=" != s and "-" != s)]):
            initstate = 4
        elif initstate == 4 and '[' in line and ']' in line:
            comma = line.find(',')
            speech_st = line.find('[')
            speech_end = line.find(']')
            if speech_st == -1:
                initstate = 0
                continue
            if comma == -1:
                firm = ""
                speaker = line[:speech_st].strip()
            else:
                speaker = line[:comma].strip()
                firm = line[comma+1:speech_st].strip()
            head_written = 1
            if head_written:
                writecell(out, head)
                head_written = 0
            order = line[speech_st+1:speech_end]
            writecell(out, speaker)
            writecell(out, firm)
            writecell(out, order)
            had_speaker = 1
        elif initstate == 4 and not any([s for s in line if ("=" != s and "-" != s)]):
            if had_speaker:
                initstate = 5
                out.write('"')
                quotes = 1
            had_speaker = 0
        elif initstate == 5:
            line = line.replace('"', '""')
            out.write(line)
        elif initstate == 0:
            continue
        else:
            continue
    f.close()
    if quotes:
        out.write('"')
    out.close()

readfile("Sample1.txt", "out1.csv")
readfile("Sample2.txt", "out2.csv")
readfile("Sample3.txt", "out3.csv")

细节

在这个解决方案中,有一个状态机,其工作如下: 1. 检测是否存在标题,如果是,则写入它 2. 在写入标题后检测发言者 3. 为该发言者编写注释 4. 切换到下一个发言者,以此类推...

您可以稍后按照您的需求处理csv文件。 一旦基本处理完成,您也可以以任何格式填充数据。

编辑:

请替换函数“writecell”

def writecell(out, data):
    data = data.replace('"', '""')
    out.write('"')
    out.write(data)
    out.write('"')
    out.write(",")

你的方法在结构上最接近我的要求。它可以很好地处理所有提供的样本文件。但有时公司名称后面会有逗号,这会破坏输出结构。我可以接受最能解决问题的答案,以满足问题中“--EDIT--”部分的样例。 - samkhan13
可以直接将内容写入CSV文件、字典或数据库中。 - samkhan13
嗨,根据您的反馈,我已更新了我的答案。感谢反馈。 - mangupt
非常感谢您的努力。您的方法很有趣,帮助我理解了我的文本识别任务。然而,由于其适用于其他模式识别任务,我已将赏金授予了最佳答案。 - samkhan13
没问题。很高兴能帮忙。 :) - mangupt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接