Spark:2.0.2 java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access Spark 2.0.2:java.util.ConcurrentModificationException:KafkaConsumer 不支持多线程访问

5

我在使用kafka 0.10.1.0和spark 2.0.2时遇到了以下错误:

private val spark = SparkSession.builder()
.master("local[*]")
.appName(job.name)
.config("spark.cassandra.connection.host","localhost"))
.config("spark.cassandra.connection.port","9042")
.config("spark.streaming.receiver.maxRate", 10000)
.config("spark.streaming.kafka.maxRatePerPartition", 10000)
.config("spark.streaming.kafka.consumer.cache.maxCapacity", 1)
.config("spark.streaming.kafka.consumer.cache.initialCapacity", 1)
.getOrCreate()

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> config.getString("kafka.hosts"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> job.name,
"auto.offset.reset" -> config.getString("kafka.offset"),
"enable.auto.commit" -> (false: java.lang.Boolean)
)`

异常

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1557)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1177)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:194)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我已经看到过一封邮件连锁,但还没有解决方案。请参考以下链接:https://www.mail-archive.com/user@spark.apache.org/msg56566.html

1个回答

0
遇到了同样的错误,找不到解决办法。相反,我使用了“--executor-cores 1”与spark-submit来避免这个问题。如果有人找到了解决办法,请发帖分享。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接