Twitter流API - urllib3.exceptions.ProtocolError:('连接中断:IncompleteRead')

11

使用 tweepy 运行一个 Python 脚本,该脚本使用 twitter 流 API 随机获取英文推文并进行一分钟的流式传输,然后切换到使用 twitter 搜索 API 进行一分钟的搜索,然后返回。我发现的问题是,在大约 40 秒左右,流媒体会崩溃并给出以下错误:

完整的错误信息如下:

urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))

读取的字节数量可能为 0 到很多。

第一次看到这种情况时,流媒体会提前停止,搜索功能会提前启动;在搜索功能完成后,它再次返回流,并在第二次出现此错误时代码崩溃。

我正在运行的代码是:

# Handles date time calculation
def calculateTweetDateTime(tweet):
    tweetDateTime = str(tweet.created_at)

    tweetDateTime = ciso8601.parse_datetime(tweetDateTime)
    time.mktime(tweetDateTime.timetuple())
    return tweetDateTime

# Checks to see whether that permitted time has past.
def hasTimeThresholdPast():
    global startTime
    if time.clock() - startTime > 60:
        return True
    else:
        return False

#override tweepy.StreamListener to add logic to on_status
class StreamListener(StreamListener):

    def on_status(self, tweet):
        if hasTimeThresholdPast():
            return False

        if hasattr(tweet, 'lang'):
            if tweet.lang == 'en':

                try:
                    tweetText = tweet.extended_tweet["full_text"]
                except AttributeError:
                    tweetText = tweet.text

                tweetDateTime = calculateTweetDateTime(tweet)

                entityList = DataProcessing.identifyEntities(True, tweetText)
                DataStorage.storeHotTerm(entityList, tweetDateTime)
                DataStorage.storeTweet(tweet)


    def on_error(self, status_code):
        def on_error(self, status_code):
            if status_code == 420:
                # returning False in on_data disconnects the stream
                return False


def startTwitterStream():

    searchTerms = []

    myStreamListener = StreamListener()
    twitterStream = Stream(auth=api.auth, listener=StreamListener())
    global geoGatheringTag
    if geoGatheringTag == False:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an'], async=True, stall_warnings=True)

    if geoGatheringTag == True:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an', 'they\'re'],
                             async=False, locations=[-4.5091, 55.7562, -3.9814, 55.9563], stall_warnings=True)



# ----------------------- Twitter API Functions ------------------------
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# --------------------------- Main Function ----------------------------

startTime = 0


def main():
    global startTime
    userInput = ""
    userInput.lower()
    while userInput != "-1":
        userInput = input("Type ACTiVATE to activate the Crawler, or DATABASE to access data analytic option (-1 to exit): \n")
        if userInput.lower() == 'activate':
            while(True):
                startTime = time.clock()

                startTwitterStream()

                startTime = time.clock()
                startTwitterSearchAPI()

if __name__ == '__main__':
    main() 

为避免代码混乱,我删除了搜索功能和数据库处理方面的内容。

如果有人知道出现这种情况的原因以及我该如何解决,请告诉我,我会很感激。


我尝试过的解决方案:
在try/except块中使用http.client.IncompleteRead:
参考Error-while-fetching-tweets-with-tweepy

将Stall_Warning设置为True:
参考Incompleteread-error-when-retrieving-twitter-data-using-python

删除英语语言过滤。

2个回答

12

问题已解决。

对于那些好奇或遇到类似问题的人:经过一些实验,我发现积压的推文是问题所在。每当系统接收到一条推文时,我的系统会运行一个实体识别和存储的进程,这需要花费一小段时间,随着时间的推移,聚集了几百至数千条推文,这个积压越来越大,直到API无法处理它并抛出错误。

解决方案:将您的“on_status/on_data/on_success”函数剥离到基本要素,并在流会话关闭后分别处理任何计算,如存储或实体识别。或者您可以使计算更加高效,并使时间差微不足道,由您决定。


这对我帮助很大,我也遇到了同样的问题。基本上解决方案就是将数据转储并分别进行处理,正如你所正确提到的那样。 - tezzaaa
嗨,谢谢你的回复。但是你说的“剥离'on_status/on_data/on_success'函数”是什么意思呢?我有点困惑,因为在你的StreamListener中甚至没有实现这个函数。 - Steak Overflow
1
@SteakOverflow 你好,"on_status"是在SteamListener下声明的第一个函数: "class StreamListener(StreamListener):def on_status(self, tweet):"我提供的其他名称,如on_data/on_success,是该类型函数常用的替代名称。无论您选择什么名称,关键是在流处于活动状态时尽量减少对数据进行的处理强度,因为这可能会使其超载并导致崩溃。读取数据的任何函数都将被归类为“on_data”函数。 - Chris Cookman
你如何检查 tweepy 的积压情况?我不太确定如何验证该过程。 - A-nak Wannapaschaiyong

0

我只是基于用户Chris Cookman的结果分享我的经验。按照他的建议操作后,我和你遇到的同样问题消失了。但是在我的情况下,我是使用discord.py。所以我创建了一个通用列表(status_list),每当 tweepy on_status 启动时,它都会附加到该通用列表中。

然后我使用 discord.py 设置了 @tasks.loop(seconds=10),以监视 status_list 是否每隔几秒钟为空,如果检测到它有内容,则会循环遍历它,然后启动每个列表上的进程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接