无法找到隐式值以供证据参数使用

4

我正在编写一个简单的flink字数统计作业,但是一直出现以下错误:

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
[error]  .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}}

我在网上搜索了很久,但是没有找到任何易懂的答案。

这是我的代码:

object Job {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/")

    val count = dataStream
                .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}}
                .map{ (_,1) }
                .groupBy(0)
                .sum(1)


    dataStream.print()
    env.execute("Flink Scala API Skeleton")
    }
}

尝试一下这个问题的答案,它可能也能帮到你:https://dev59.com/e10b5IYBdhLWcg3wJ-bb - richj
我已经导入了所有必要的库,包括 flink.api.scala._ 和 flink.streaming.api.scala._。 - sidd607
问题在于 flink(版本1.0.3)的 DataStream[(String, Int)] 上没有 groupBy(...) 方法。但是有一个 keyBy(Int) 方法,它将生成一个 KeyedStream[(String, Int), Tuple]。 - richj
你能否尝试移除 import flink.api.scala._,因为流处理和批处理的 Scala 包对象都会导入 createTypeInformation。因此这些导入可能会冲突。 - Till Rohrmann
2个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
3

您需要导入

import org.apache.flink.api.scala._
为了启用隐式转换,而不是为您使用的每种类型创建隐式值。

1

def main(args: Array[String]) {...}的第一行添加implicit val typeInfo = TypeInformation.of(classOf[(String)]),对我来说解决了问题。

object Job {
  def main(args: Array[String]) {
    implicit val typeInfo = TypeInformation.of(classOf[(String)]) //Add this here
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = env.readTextFile("file:///home/plivo/code/flink/scala/flinkstream/test/")

    val count = dataStream
                .flatMap{_.toLowerCase.split("\\W+") filter {_.nonEmpty}}
                .map{ (_,1) }
                .groupBy(0)
                .sum(1)


    dataStream.print()
    env.execute("Flink Scala API Skeleton")
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,