优化:将来自流式API的JSON导入Mongo数据库

9

背景: 我已经设置了一个python模块,用于从流API中获取JSON对象并使用pymongo将它们(每次批量插入25个)存储在MongoDB中。为了比较,我还有一个bash命令来从相同的流API进行curl操作,并将其pipemongoimport。这两种方法都将数据存储在单独的集合中。

定期地,我会监视集合的count()以检查它们的情况。

到目前为止,我发现python模块落后于curl | mongoimport方法约1000个JSON对象。

问题: 如何优化我的python模块,使其与curl | mongoimport大致同步?

由于我不是使用Twitter API而是第三方流服务,所以无法使用tweetstream

请问有人能帮我解决这个问题吗?

Python模块:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

感谢阅读。
谢谢。

你插入的文档是否有“_id”字段? - Asya Kamsky
您正在使用Mongo的哪个版本以及pymongo的哪个版本? - Asya Kamsky
@AsyaKamsky Python 2.7,MongoDb 2.0.4和PyMongo 2.2。 - Sagar Hatekar
你的Bash Curl脚本/命令长什么样? - Asya Kamsky
我更新了我的回答 - 我认为那是最终的结论 :) - Asya Kamsky
抱歉,我无法发布确切的curl命令,因为它包含我们的凭据。但是,这个问题适用于任何流API。就我们所获得的最终性能提升而言,我已经回答了自己的问题。你的更新答案排名第二。不管怎样,你获得了赏金!再次感谢你查看这个问题。我将定期监控这个线程,并根据最高赞数接受相应的答案。目前来看,那个是你的! - Sagar Hatekar
2个回答

3

最初您的代码中存在一个错误。

                if self.chunk_count % 50 == 0
                    self.raw_tweets.insert(self.tweet_list)
                    self.chunk_count = 0

你重置了chunk_count,但没有重置tweet_list。因此,第二次尝试插入100个项目(50个新项目加上50个已经在上一次发送到数据库的项目)。你已经修复了这个问题,但仍然看到性能差异。
整个批处理大小的事情实际上是一个误导。我尝试使用大型json文件通过Python加载它与通过mongoimport加载它,Python始终更快(即使在安全模式下-见下文)。
仔细查看您的代码,我意识到问题在于流API实际上正在以块的形式传递数据。您应该只是将这些块放入数据库中(这就是mongoimport正在做的)。您的Python额外工作来拆分流,将其添加到列表中,然后定期发送批次到Mongo可能是我看到的和您看到的之间的差异。
尝试使用以下代码片段进行handle_data()。
def handle_data(self, data):
    try:
        string_buffer = StringIO(data)
        tweets = json.load(string_buffer)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)
    try:
        self.raw_tweets.insert(tweets)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)

需要注意的是,您的Python插入未在“安全模式”下运行 - 您应该通过向插入语句添加参数safe=True来更改。然后,任何插入失败都会引发异常,您的try/catch将打印错误以暴露问题。

这也不会对性能造成太大影响 - 我目前正在运行一个测试,在约五分钟后,两个集合的大小为14120和14113。


顺便说一下,我试了你的代码 - 经过修复后,Python 插入数据的速度大约是 mongoimport 的两倍。这是因为默认情况下,“安全”插入是关闭的。通过打开安全写入(将 safe=True 传递给 insert),Python 插入时间仍然比 mongoimport 快了约 75%。 - Asya Kamsky
感谢指出这些问题!我已经进行了必要的更改(还更新了上面的代码):在self.chunk_count = 0之后添加了"self.tweet_list = []",并将批处理大小增加到1000。它仍然似乎存在延迟——Python模块计数为5000,而curl mongoimport组合计数为5718(之前是4000:5662)。有什么见解吗? - Sagar Hatekar
由于您每次只插入1000个,所以您总是会看到1000的倍数-似乎它并不落后... - Asya Kamsky
是的,但4000:5662意味着最小延迟仍然为600。这两个地方是否可以进行任何优化 - self.string_buffer = cStringIO.StringIO(data)for line in self.string_buffer? - Sagar Hatekar
我在没有使用curl或pycurl的情况下进行了测试 - 我只是转储了一个大的.json文件,并使用mongoimport和pymongo进行了加载。这就是为什么我怀疑你的问题不是插入速度不够快,也许是没有通过pycurl足够快地获取数据。 - Asya Kamsky
已接受。这将有助于我更好地解决问题。能否帮个忙,通过点赞来中和一下这个问题?有些人没有理由地给了一个-1,导致这个问题的声誉变差:( 顺便说一句,现在我们的分数相同了!=D - Sagar Hatekar

1

已经摆脱了StringIO库。在这种情况下,由于WRITEFUNCTION回调handle_data会为每一行调用一次,因此直接加载JSON即可。然而,有时数据中可能包含两个JSON对象。很抱歉,我不能发布我使用的curl命令,因为它包含我们的凭据。但是,正如我所说,这是适用于任何流API的一般问题。


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接