我正在尝试从archive.org存档中获取Twitter数据并将其加载到数据库中。我首先尝试加载特定月份的所有推文,然后根据地区或标签等条件进行筛选,只留下我感兴趣的推文。
我能够运行下面描述的脚本来完成我的目标,但问题在于速度非常慢。它已经运行了大约半个小时,仅读取了一个TAR文件中内部的 ~6 / 50,000 .bz2文件。
以下是示例TAR文件的一些统计信息:
我能够运行下面描述的脚本来完成我的目标,但问题在于速度非常慢。它已经运行了大约半个小时,仅读取了一个TAR文件中内部的 ~6 / 50,000 .bz2文件。
以下是示例TAR文件的一些统计信息:
- 总大小:~30-40GB
- 内部.bz2文件数量(按文件夹排列):50,000
- 一个.bz2文件的大小:~600kb
- 一个提取的JSON文件的大小:~5MB,~3600条推文。
- 我应该在Python中将文件提取到磁盘而不是缓冲它们吗?
- 我是否应该考虑将进程的一部分多线程化?哪个部分最适合这样做?
- 或者,我当前获得的速度相对于这样的脚本而言是否正常?
该脚本目前使用了我的CPU约3%和RAM内存约6%。
非常感谢任何帮助。
import tarfile
import dataset # Using dataset as I'm still iteratively developing the table structure(s)
import json
import datetime
def scrape_tar_contents(filename):
"""Iterates over an input TAR filename, retrieving each .bz2 container:
extracts & retrieves JSON contents; stores JSON contents in a postgreSQL database"""
tar = tarfile.open(filename, 'r')
inner_files = [filename for filename in tar.getnames() if filename.endswith('.bz2')]
num_bz2_files = len(inner_files)
bz2_count = 1
print('Starting work on file... ' + filename[-20:])
for bz2_filename in inner_files: # Loop over all files in the TAR archive
print('Starting work on inner file... ' + bz2_filename[-20:] + ': ' + str(bz2_count) + '/' + str(num_bz2_files))
t_extract = tar.extractfile(bz2_filename)
data = t_extract.read()
txt = bz2.decompress(data)
tweet_errors = 0
current_line = 1
num_lines = len(txt.split('\n'))
for line in txt.split('\n'): # Loop over the lines in the resulting text file.
if current_line % 100 == 0:
print('Working on line ' + str(current_line) + '/' + str(num_lines))
try:
tweet = json.loads(line)
except ValueError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
try:
tweet_id = tweet['id']
tweet_text = tweet['text']
tweet_locale = tweet['lang']
created_at = tweet['created_at']
tweet_json = tweet
data = {'tweet_id': tweet_id,
'tweet_text': tweet_text,
'tweet_locale': tweet_locale,
'created_at_str': created_at,
'date_loaded': datetime.datetime.now(),
'tweet_json': tweet_json}
db['tweets'].upsert(data, ['tweet_id'])
except KeyError, e:
error_log = {'Date_time': datetime.datetime.now(),
'File_TAR': filename,
'File_BZ2': bz2_filename,
'Line_number': current_line,
'Line': line,
'Error': str(e)}
tweet_errors += 1
db['error_log'].upsert(error_log, ['File_TAR', 'File_BZ2', 'Line_number'])
print('Error occured, now at ' + str(tweet_errors))
continue
if __name__ == "__main__":
with open("postgresConnecString.txt", 'r') as f:
db_connectionstring = f.readline()
db = dataset.connect(db_connectionstring)
filename = r'H:/Twitter datastream/Sourcefiles/archiveteam-twitter-stream-2013-01.tar'
scrape_tar_contents(filename)