我在使用kafka 0.10.1.0和spark 2.0.2时遇到了以下错误:
private val spark = SparkSession.builder()
.master("local[*]")
.appName(job.name)
.config("spark.cassandra.connection.host","localhost"))
.config("spark.cassandra.connection.port","9042")
.config("spark.streaming.receiver.maxRate", 10000)
.config("spark.streaming.kafka.maxRatePerPartition", 10000)
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1)
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1)
.getOrCreate()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> job.name,
"auto.offset.reset" -> config.getString("kafka.offset"),
"enable.auto.commit" -> (false: java.lang.Boolean)
)`
异常
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我已经看到过一封邮件连锁,但还没有解决方案。请参考以下链接:https://www.mail-archive.com/user@spark.apache.org/msg56566.html