我正在尝试运行以下代码以获取实时的Twitter信息:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import twitter4j.auth.Authorization
import twitter4j.Status
import twitter4j.auth.AuthorizationFactory
import twitter4j.conf.ConfigurationBuilder
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.Function
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
val consumerKey = "xxx"
val consumerSecret = "xxx"
val accessToken = "xxx"
val accessTokenSecret = "xxx"
val url = "https://stream.twitter.com/1.1/statuses/filter.json"
val sparkConf = new SparkConf().setAppName("Twitter Streaming")
val sc = new SparkContext(sparkConf)
val documents: RDD[Seq[String]] = sc.textFile("").map(_.split(" ").toSeq)
// Twitter Streaming
val ssc = new JavaStreamingContext(sc,Seconds(2))
val conf = new ConfigurationBuilder()
conf.setOAuthAccessToken(accessToken)
conf.setOAuthAccessTokenSecret(accessTokenSecret)
conf.setOAuthConsumerKey(consumerKey)
conf.setOAuthConsumerSecret(consumerSecret)
conf.setStreamBaseURL(url)
conf.setSiteStreamBaseURL(url)
val filter = Array("Twitter", "Hadoop", "Big Data")
val auth = AuthorizationFactory.getInstance(conf.build())
val tweets : JavaReceiverInputDStream[twitter4j.Status] = TwitterUtils.createStream(ssc, auth, filter)
val statuses = tweets.dstream.map(status => status.getText)
statuses.print()
ssc.start()
当执行到这条命令:val sc = new SparkContext(sparkConf)
,会出现以下错误:
17/05/09 09:08:35 WARN SparkContext: 在同一个JVM中检测到多个正在运行的SparkContext! org.apache.spark.SparkException:在此JVM中只能运行一个SparkContext(请参见SPARK-2243)。要忽略此错误,请设置spark.driver.allowMultipleContexts = true。
我尝试将以下参数添加到sparkConf值中,但错误仍然出现:
val sparkConf = new SparkConf().setAppName("Twitter Streaming").setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true")
如果我忽略错误并继续运行命令,我会得到另一个错误:
17/05/09 09:15:44 警告 ReceiverSupervisorImpl:正在延迟2000毫秒重新启动接收器:401错误接收推文:身份验证凭据(https://dev.twitter.com/pages/auth)缺失或不正确。请确保您设置了有效的消费者密钥/密钥、访问令牌/密钥,并且系统时钟同步。\n\n\n错误401未经授权的HTTP ERROR:401
问题访问'/1.1/statuses/filter.json'。原因:未经授权
非常感谢任何形式的贡献。祝你好运!