如何有效地将30GB以上的BZ2 JSON Twitter文件从TAR文件中读取到PostgreSQL中?

6
我正在尝试从archive.org存档中获取Twitter数据并将其加载到数据库中。我首先尝试加载特定月份的所有推文,然后根据地区或标签等条件进行筛选,只留下我感兴趣的推文。
我能够运行下面描述的脚本来完成我的目标,但问题在于速度非常慢。它已经运行了大约半个小时,仅读取了一个TAR文件中内部的 ~6 / 50,000 .bz2文件。
以下是示例TAR文件的一些统计信息:
  • 总大小:~30-40GB
  • 内部.bz2文件数量(按文件夹排列):50,000
  • 一个.bz2文件的大小:~600kb
  • 一个提取的JSON文件的大小:~5MB,~3600条推文。
在优化此过程以提高速度时,我应该注意哪些方面?
  • 我应该在Python中将文件提取到磁盘而不是缓冲它们吗?
  • 我是否应该考虑将进程的一部分多线程化?哪个部分最适合这样做?
  • 或者,我当前获得的速度相对于这样的脚本而言是否正常?

该脚本目前使用了我的CPU约3%和RAM内存约6%。

非常感谢任何帮助。

import tarfile
import dataset # Using dataset as I'm still iteratively developing the table structure(s)
import json
import datetime


def scrape_tar_contents(filename):
    """Iterates over an input TAR filename, retrieving each .bz2 container:
       extracts & retrieves JSON contents; stores JSON contents in a postgreSQL database"""
    tar = tarfile.open(filename, 'r')
    inner_files = [filename for filename in tar.getnames() if filename.endswith('.bz2')]

    num_bz2_files = len(inner_files)
    bz2_count = 1
    print('Starting work on file... ' + filename[-20:])
    for bz2_filename in inner_files: # Loop over all files in the TAR archive
        print('Starting work on inner file... ' + bz2_filename[-20:] + ': ' + str(bz2_count) + '/' + str(num_bz2_files))
        t_extract = tar.extractfile(bz2_filename)
        data = t_extract.read()
        txt = bz2.decompress(data)

        tweet_errors = 0
        current_line = 1
        num_lines = len(txt.split('\n'))
        for line in txt.split('\n'):  # Loop over the lines in the resulting text file.
            if current_line % 100 == 0:
                print('Working on line ' + str(current_line) + '/' + str(num_lines))
                try:
                    tweet = json.loads(line)
                except ValueError, e:
                    error_log = {'Date_time': datetime.datetime.now(),
                                'File_TAR': filename,
                                'File_BZ2': bz2_filename,
                                'Line_number': current_line,
                                'Line': line,
                                'Error': str(e)}
                    tweet_errors += 1
                    db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
                    print('Error occured, now at ' + str(tweet_errors))
                try:
                    tweet_id = tweet['id']
                    tweet_text = tweet['text']
                    tweet_locale = tweet['lang']
                    created_at = tweet['created_at']
                    tweet_json = tweet
                    data = {'tweet_id': tweet_id,
                            'tweet_text': tweet_text,
                            'tweet_locale': tweet_locale,
                            'created_at_str': created_at,
                            'date_loaded': datetime.datetime.now(),
                            'tweet_json': tweet_json}
                    db['tweets'].upsert(data, ['tweet_id'])
                except KeyError, e:
                    error_log = {'Date_time': datetime.datetime.now(),
                                'File_TAR': filename,
                                'File_BZ2': bz2_filename,
                                'Line_number': current_line,
                                'Line': line,
                                'Error': str(e)}
                    tweet_errors += 1
                    db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
                    print('Error occured, now at ' + str(tweet_errors))
                    continue

if __name__ == "__main__":
    with open("postgresConnecString.txt", 'r') as f:
        db_connectionstring = f.readline()
    db = dataset.connect(db_connectionstring)

    filename = r'H:/Twitter datastream/Sourcefiles/archiveteam-twitter-stream-2013-01.tar'
    scrape_tar_contents(filename)
1个回答

10

一个tar文件不包含文件的位置索引。此外,一个tar文件可以包含同一文件的多个副本。因此,当您提取一个文件时,必须读取整个tar文件。即使它找到了文件, tar文件的其余部分仍然必须被读取以检查是否存在更晚的副本。

这使得提取一个文件与提取所有文件同样耗费时间和精力。

因此,除非您只需要一个文件或没有空间提取所有内容,否则永远不要在大型tar文件上使用 tar.extractfile(...)

如果您有足够的空间(考虑到现代硬盘的大小,您几乎肯定有足够的空间),请使用 tar.extractall 或者调用系统命令 tar xf ... 提取所有内容,然后处理提取出的文件。


代码手动提取后运行速度大大加快:谢谢。我添加了"https://dev59.com/aIbca4cB1Zd3GeqPZrsO" 来请求多线程方面的帮助。你能提供一些相关指导吗? - MattV
为什么Linux的tar命令可以在不将完整未解压内容加载到内存中的情况下提取单个文件或列出清单,而Python的tarfile似乎必须这样做呢?在我的测试中,Python需要使用2GB的内存才能获取一个文件,而tar命令只使用了200KB。看起来Python版本与tar命令相比效率非常低下。 - PKKid
@PKKid:你可能正在看到这种情况。如果这不能解答你的问题,请发布一个新问题。 - unutbu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接