如何在 tweepy 脚本出错时重新启动?

7
我有一个Python脚本,持续将与跟踪关键词相关的推文存储到文件中。但是,由于下面附加的错误,该脚本往往会反复崩溃。我该如何编辑脚本以使其自动重新启动?我看到了很多解决方案,包括这个(Restarting a program after exception),但我不确定如何在我的脚本中实现它。
import sys
import tweepy
import json
import os

consumer_key=""
consumer_secret=""
access_key = ""
access_secret = ""

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
# directory that you want to save the json file
os.chdir("C:\Users\json_files")
# name of json file you want to create/open and append json to
save_file = open("12may.json", 'a')

class CustomStreamListener(tweepy.StreamListener):
    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()

        # self.list_of_tweets = []

    def on_data(self, tweet):
        print tweet
        save_file.write(str(tweet))

    def on_error(self, status_code):
        print >> sys.stderr, 'Encountered error with status code:', status_code
        return True # Don't kill the stream
        print "Stream restarted"

    def on_timeout(self):
        print >> sys.stderr, 'Timeout...'
        return True # Don't kill the stream
        print "Stream restarted"

sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["test"])

===========================================================================

Traceback (most recent call last):
  File "C:\Users\tweets_to_json.py", line 41, in <module>
    sapi.filter(track=["test"])
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 316, in filter
    self._start(async)
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 235, in _start
    self._run()
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 165, in _run
    self._read_loop(resp)
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 206, in _read_loop
    for c in resp.iter_content():
  File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\models.py", line 541, in generate
    chunk = self.raw.read(chunk_size, decode_content=True)
  File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\packages\urllib3\response.py", line 171, in read
    data = self._fp.read(amt)
  File "C:\Python27\lib\httplib.py", line 543, in read
    return self._read_chunked(amt)
  File "C:\Python27\lib\httplib.py", line 603, in _read_chunked
    value.append(self._safe_read(amt))
  File "C:\Python27\lib\httplib.py", line 660, in _safe_read
    raise IncompleteRead(''.join(s), amt)
IncompleteRead: IncompleteRead(0 bytes read, 1 more expected)

当发生崩溃时,它会进入on_error吗? - sundar nataraj
我认为不是这样的,因为它没有打印“'遇到状态码错误:'”。 - Eugene Yan
(''.join(s), amt) 在你的程序中是这一行。 - sundar nataraj
你是说每次调用流式传输都会出现错误吗? - sundar nataraj
好的,我现在正在按照您的建议运行新程序,我没有看到错误。如果这不是一个好方法,那么有什么更好的方法呢?SO上的另一种解决方案似乎也是将函数调用放在while-try循环中。 - Eugene Yan
显示剩余4条评论
5个回答

19

通过编写一个新的函数来实现while/try循环,我已经想出如何将其纳入流中:

def start_stream():
    while True:
        try:
            sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
            sapi.filter(track=["Samsung", "s4", "s5", "note" "3", "HTC", "Sony", "Xperia", "Blackberry", "q5", "q10", "z10", "Nokia", "Lumia", "Nexus", "LG", "Huawei", "Motorola"])
        except: 
            continue

start_stream()

我尝试通过手动使用CMD + C中断程序来测试自动重启功能。不过,如果有更好的测试方法,我很乐意听取建议。


1
我必须捕获 KeyboardInterrupt 以便有一种退出脚本的方式 除非 KeyboardInterrupt: break - Rocco

5

最近我遇到了这个问题,想分享更详细的信息。

引起这个错误的原因是选择的流过滤器太广泛测试。因此,您会以比您可以接受的更快的速度接收流,这会导致IncompleRead错误。

这可以通过细化搜索或使用更具体的异常来解决:

from http.client import IncompleteRead
...
try:
    sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
    sapi.filter(track=["test"])
except IncompleRead:
    pass

2

使用递归调用而不是无限 while 循环更好。请看下面的 filter 函数。例如:

from tweepy import Stream
from service.twitter.listener.tweety_listener import TweetyStreamDataListener
from settings import twitter_config

class Tweety(object):
    def __init__(self, listener=TweetyStreamDataListener()):
        self.listener = listener
        self.__auth__ = None

    def __authenticate__(self):
        from tweepy import OAuthHandler
        if self.__auth__ is None:
            self.__auth__ = OAuthHandler(twitter_config['consumer_key'], twitter_config['consumer_secret'])
            self.__auth__.set_access_token(twitter_config['access_token'], twitter_config['access_token_secret'])
        return self.__auth__ is not None

    def __streamer__(self):
        is_authenticated = self.__authenticate__()
        if is_authenticated:
            return Stream(self.__auth__, self.listener)
        return None

    def filter(self, keywords=None, async=True):
        streamer = self.__streamer__()
        try:
            print "[STREAM] Started steam"
            streamer.filter(track=keywords, async=async)
        except Exception as ex:
            print "[STREAM] Stream stopped! Reconnecting to twitter stream"
            print ex.message, ex.args
            self.filter(keywords=keywords, async=async)

我认为恰恰相反:使用递归而不是循环处理这种情况最终会导致内存问题,特别是针对长时间运行的服务。 - Basa

0
我使用 tweepy 编写了一个包含 2 个进程的流式处理程序。它会下载、压缩并将数据转储到文件中,每小时轮换一次。 该程序每小时重新启动一次,并可以定期检查流式处理过程,以查看是否已下载任何新推文。 如果没有,则重新启动整个系统。
代码可以在 这里 找到。 请注意,它使用管道进行压缩。如果不需要压缩,则很容易修改源代码。

0

一个选项是尝试使用模块 multiprocessing 。我会给出两个理由。

  1. 能够在不必“杀死”整个脚本/进程的情况下运行进程一段时间
  2. 您可以将它放在for循环中,并且每当它死亡或您选择终止它时,它就会重新启动。

我采取了完全不同的方法,但部分原因是我正在定期(或者应该是定期)保存我的推文。@ Eugeune Yan,我认为try except是解决问题的简单而优雅的方法。虽然这种方法你不知道何时或是否失败了,但愿有人会对此发表评论;但我不知道这真的重要吗(编写几行代码使其发生很容易)。

import tiipWriter #Twitter & Textfile writer I wrote with Tweepy.
from add import ThatGuy # utility to supply log file names that won't overwrite old ones.
import multiprocessing


if __name__ == '__main__':
        #number of time increments script needs to run        
        n = 60
        dir = "C:\\Temp\\stufffolder\\twiitlog"
        list = []
        print "preloading logs"
        ThatGuy(n,dir,list) #Finds any existing logs in the folder and one-ups it

        for a in list:
            print "Collecting Tweets....."
            # this is my twitter/textfile writer process
            p = multiprocessing.Process(target=tiipWriter.tiipWriter,args = (a,)) 
            p.start()
            p.join(1800) # num of seconds the process will run
            if p.is_alive():
                print " \n Saving Twitter Stream log   @  " + str(a)
                p.terminate()
                p.join()
            a = open(a,'r')
            a.close()
            if a.closed == True:
                print "File successfully closed"
            else: a.close()
            print "jamaica" #cuz why not

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接