停止Twitter流并使用Twitter4j返回带有状态列表。

5

使用Twitter4j提供的代码示例,我想在收集1,000条状态列表后停止流,并返回此列表。我该如何做到这一点?

public class Stream {
    public List<Status> execute throws TwitterException {

    List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

    StatusListener listener = new StatusListener() {

        public void onStatus(Status status) {
            statuses.add(status);
            if (statuses.size>1000){
              //return statuses. Obviously that's not the correct place for a return statement...
            }
        }

        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
            System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
        }

        public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
        }

        public void onScrubGeo(long userId, long upToStatusId) {
            System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
        }

        public void onException(Exception ex) {
            ex.printStackTrace();
        }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = {"Keyword 1", "Keyword 2"};

    fq.track(keywords);

    twitterStream.addListener(listener);
    twitterStream.filter(fq);

}

你有找到任何解决方案吗? - jackyesind
不,我仍然对答案感兴趣! - seinecle
3个回答

6
考虑使用BlockingQueue作为中介,使用监听器将Status对象添加到队列中。
一旦流开始,您可以从队列中取出Status,直到获得所需的一千个。
作为起点,代码应如下所示:
public class Stream {
    private static final int TOTAL_TWEETS = 1000;

    public List<Status> execute() throws TwitterException {
        // skipped for brevity...

        // TODO: You may have to tweak the capacity of the queue, depends on the filter query
        final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000); 
        final StatusListener listener = new StatusListener() {

            public void onStatus(Status status) {
                statuses.offer(status); // Add received status to the queue
            }

            // etc...
        };

        final FilterQuery fq = new FilterQuery();
        final String keywords[] = {"Keyword 1", "Keyword 2"};
        fq.track(keywords);

        twitterStream.addListener(listener);
        twitterStream.filter(fq);

        // Collect the 1000 statues
        final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);
        while (collected.size() < TOTAL_TWEETS) {
            // TODO: Handle InterruptedException
            final Status status = statuses.poll(10, TimeUnit.SECONDS); 

            if (status == null) {
                // TODO: Consider hitting this too often could indicate no further Tweets
                continue;
            }
            collected.add(status);
        }
        twitterStream.shutdown();

        return collected;
    }
} 

你好,我正在尝试关闭Twitter流,但总是收到一堆警告/错误。我有一个调用清理和关闭的方法,然后我得到:(请参见我的“下面的答案”)。你知道为什么会发生这种情况吗? - Loebre

4

强制将异步代码转换为同步模式并不是一个好主意。请参见https://dev59.com/wFbUa4cB1Zd3GeqPDfnt#5934656 尝试重新设计你的逻辑。

但是,以下代码可以按照你的要求正常工作。

public class Stream {

  public static void main(String[] args) throws TwitterException {
    Stream stream = new Stream();
    stream.execute();
  }

  private final Object lock = new Object();
  public List<Status> execute() throws TwitterException {

    final List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build())
        .getInstance();

    StatusListener listener = new StatusListener() {

      public void onStatus(Status status) {
        statuses.add(status);
        System.out.println(statuses.size() + ":" + status.getText());
        if (statuses.size() > 100) {
          synchronized (lock) {
            lock.notify();
          }
          System.out.println("unlocked");
        }
      }

      public void onDeletionNotice(
          StatusDeletionNotice statusDeletionNotice) {
        System.out.println("Got a status deletion notice id:"
            + statusDeletionNotice.getStatusId());
      }

      public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
        System.out.println("Got track limitation notice:"
            + numberOfLimitedStatuses);
      }

      public void onScrubGeo(long userId, long upToStatusId) {
        System.out.println("Got scrub_geo event userId:" + userId
            + " upToStatusId:" + upToStatusId);
      }

      public void onException(Exception ex) {
        ex.printStackTrace();
      }

      @Override
      public void onStallWarning(StallWarning sw) {
        System.out.println(sw.getMessage());

      }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = { "federer", "nadal", "#Salute" };

    fq.track(keywords);


    twitterStream.addListener(listener);
    twitterStream.filter(fq);

    try {
      synchronized (lock) {
        lock.wait();
      }
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("returning statuses");
    twitterStream.shutdown();
    return statuses;
  }
}

我最终接受了这个答案,因为这是我实现的,并且它运行良好。谢谢。 - seinecle

0

有两种方法:

如果你只需要完成消费流的线程(这是调用你的监听器的线程),你可以使用twitterStream.cleanUp();。这将优雅地停止线程。你可能想在你的状态监听器中使用一个boolean stopped变量,以忽略在此事件之后收到的任何调用。

你也可以通过调用twitterStream.shutdown();关闭消费流线程及其分发线程(它是一个守护线程)。然而,这是一种更加粗暴的结束与Twitter API通信的方法。虽然如果你从监听器内部调用它,它也能工作,但我更喜欢@krishnakumarp建议的方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接