使用Tweepy监听流并搜索推文。如何停止先前的搜索并仅监听新流?

18

我正在使用Flask和Tweepy来搜索实时推文。在前端,我有一个用户文本输入框和一个名为“搜索”的按钮。理想情况下,当用户在输入框中输入搜索词并单击“搜索”按钮时,Tweepy应该侦听新的搜索词并停止先前的搜索流。当单击“搜索”按钮时,执行以下函数:

@app.route('/search', methods=['POST'])
# gets search-keyword and starts stream
def streamTweets():
    search_term = request.form['tweet']
    search_term_hashtag = '#' + search_term
    # instantiate listener
    listener = StdOutListener()
    # stream object uses listener we instantiated above to listen for data
    stream = tweepy.Stream(auth, listener)

    if stream is not None:
        print "Stream disconnected..."
        stream.disconnect()

    stream.filter(track=[search_term or search_term_hashtag], async=True)
    redirect('/stream') # execute '/stream' sse
    return render_template('index.html')

上面代码中倒数第二行执行的/stream路由如下:
@app.route('/stream')
def stream():
    # we will use Pub/Sub process to send real-time tweets to client
    def event_stream():
        # instantiate pubsub
        pubsub = red.pubsub()
        # subscribe to tweet_stream channel
        pubsub.subscribe('tweet_stream')
        # initiate server-sent events on messages pushed to channel
        for message in pubsub.listen():
            yield 'data: %s\n\n' % message['data']
    return Response(stream_with_context(event_stream()), mimetype="text/event-stream")

我的代码可以正常工作,也就是说,每当“搜索”按钮被点击时,它会启动一个新的流并搜索给定的术语,但它不会停止前一次搜索。例如,如果我的第一个搜索词是“NYC”,然后我想搜索另一个词,比如“洛杉矶”,它将为我提供“NYC”和“洛杉矶”的结果,这不是我想要的。我只想搜索“洛杉矶”。如何解决这个问题?换句话说,我如何停止以前的流?我查看了其他线程,并知道我必须使用stream.disconnect(),但我不确定如何在我的代码中实现这一点。任何帮助或意见都将非常感激。非常感谢!


1
有关实际项目,请查看此链接 https://github.com/kimasx/twtr-search-map - Raja Simon
你尝试过在函数外部将你创建的流对象(stream)保留为引用,这样你就可以在tweetStream()函数中路由到\search之前首先调用.disconnect方法,然后再创建新的流对象吗?我没有使用过Flask,所以不知道这是否是标准模式,但它似乎应该可以工作。 - J Richard Snape
@JRichardSnape 关于流 object 的引用?怎么做呢?您能回答一下,这样我们可以进一步讨论。 - Raja Simon
@RajaSimon 我的意思是,如果你有多个用户,他们都输入自己的搜索内容,并且你想向他们显示他们的流,直到他们输入另一个搜索内容,此时你想断开连接并向他们显示不同的流,则需要一种方法来保留每个用户正在“监听”的流对象的引用,并将其与该用户关联起来。有点像这里的第一个答案。我不是redis.pubsub()模型的专家,因此需要一些时间来找出最佳/预期的方法来实现这一点。 - J Richard Snape
@rajasimon 我之前没有发布答案,因为我不确定自己是否有足够的专业知识来给出一个能够在大规模情况下有效运作的好答案。 - J Richard Snape
显示剩余3条评论
3个回答

4
以下是一些代码,当创建新的流时,它将取消旧的流。它通过将新的流添加到全局列表中,并在创建新的流时调用stream.disconnect()来断开列表中所有流的连接。
diff --git a/app.py b/app.py
index 1e3ed10..f416ddc 100755
--- a/app.py
+++ b/app.py
@@ -23,6 +23,8 @@ auth.set_access_token(access_token, access_token_secret)
 app = Flask(__name__)
 red = redis.StrictRedis()

+# Add a place to keep track of current streams
+streams = []

 @app.route('/')
 def index():
@@ -32,12 +34,18 @@ def index():
 @app.route('/search', methods=['POST'])
 # gets search-keyword and starts stream
 def streamTweets():
+        # cancel old streams
+        for stream in streams:
+            stream.disconnect()
+
        search_term = request.form['tweet']
        search_term_hashtag = '#' + search_term
        # instantiate listener
        listener = StdOutListener()
        # stream object uses listener we instantiated above to listen for data
        stream = tweepy.Stream(auth, listener)
+        # add this stream to the global list
+        streams.append(stream)
        stream.filter(track=[search_term or search_term_hashtag],
                async=True) # make sure stream is non-blocking
        redirect('/stream') # execute '/stream' sse

这并没有解决会话管理的问题。在您目前的设置中,一个用户的搜索将影响所有用户的搜索。可以通过为用户提供一些标识符并将其流与其标识符一起存储来避免这种情况。最简单的方法可能是使用 Flask 的 session 支持。您也可以像 Pierre 建议的那样使用 requestId 来实现这一点。无论哪种情况,您还需要编写代码以注意到用户何时关闭页面并关闭其流。

我也提交了一个pull request - MattL
如果用户关闭浏览器或前端发生了什么,那么流将在后台永远运行,对吗? - Raja Simon
没错。使用这段代码,流将保持打开状态,直到启动新的流或服务器关闭。 - MattL

1
免责声明:我对Tweepy一无所知,但这似乎是一个设计问题。
您是否正在尝试将状态添加到RESTful API中?您可能有一个设计问题。正如JRichardSnape所回答的那样,您的API不应该负责取消请求;它应该在前端完成。我的意思是,在调用此函数的javascript / AJAX /等中,添加另一个调用,到新函数
@app.route('/cancelSearch', methods=['POST']) 带有具有搜索术语的"POST"。只要您没有状态,您就无法在异步调用中安全地执行此操作:想象一下,其他人同时进行相同的搜索,然后取消其中一个将会取消两个(请记住,您没有状态,因此您不知道要取消谁)。也许您需要状态来设计。
如果您必须继续使用此方法,并且不介意打破“无状态”规则,则可以向您的请求添加“状态”。在这种情况下,情况并不那么糟糕,因为您可以启动一个线程并将其命名为userId,然后在每次新搜索时杀死该线程。
def streamTweets():
    search_term = request.form['tweet']
    userId = request.form['userId'] # If your limit is one request per user at a time. If multiple windows can be opened and you want to follow this limit, store userId in a cookie.
    #Look for any request currently running with this ID, and cancel them

或者,您可以返回一个requestId,然后将其保存在前端中,可以调用cancelSearch?requestId=$requestId。在cancelSearch中,您必须找到未决请求(听起来像是在 tweepy 中,因为您没有使用自己的线程)并断开连接。

出于好奇,我刚刚观察了一下在 Google 上搜索时会发生什么,并且它使用了 GET 请求。看一下(调试工具 -> 网络;然后输入一些文本并查看自动填充)。Google 使用一个与每个请求一起发送的令牌(每次您键入某些内容时都会发送)。这并不意味着它用于此,但基本上就是我所描述的。如果您不想要会话,则使用唯一标识符


你能详细说明一下这行代码吗?#查找当前正在运行的具有此ID的任何请求,并取消它们?如何找到具有该ID的正在运行的请求? - Raja Simon
由于我不了解 tweepy,所以我会用丑陋的方式:一个全局字典 streamDict = {userId : stream}。信号的问题在于你可能会过早或过晚地终止它并浪费资源。如果这是我的项目,我可能会修改 tweepy 以具有查找函数,并使每个流具有标签(或已知的线程名称)。PS:全局变量通常意味着你没有正确的工具。在这种情况下是真的:我不知道正确的 tweepy 工具或者它不存在。 - Pierre-Francoys Brousseau

1

我通过使用计时器方法来解决了它,但我仍在寻找Pythonic的方法。

from streamer import StreamListener
def stream():
    hashtag = input
    #assign each user an ID ( for pubsub )
    StreamListener.userid = random_user_id
    def handler(signum, frame):
        print("Forever is over")
        raise Exception("end of time")

    def main_stream():
        stream = tweepy.Stream(auth, StreamListener())
        stream.filter(track=track,async=True)
        redirect(url_for('map_stream'))

    def close_stream():
        # this is for closing client list in redis but don't know it's working
        obj = redis.client_list(tweet_stream)
        redis_client_list = obj[0]['addr']
        redis.client_kill(redis_client_list)
        stream = tweepy.Stream(auth, StreamListener())
        stream.disconnect()

    import signal
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(300)
    try:
        main_stream()
    except Exception:
        close_stream()
        print("function terminate")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接