Flink + Kafka + JSON - Java示例

3

我正在尝试使用以下代码从Kafka主题获取JSON:

public class FlinkMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream messageStream = env.addSource(
                new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

        messageStream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        });

        env.execute();
    }
}

问题如下:
1) 由于某些原因,该程序无法运行。
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

The problem is at line: `messageStream.map(....`

2) 可能以上问题与 DataStream 没有类型有关。但是如果我尝试进行以下操作:

DataStream<String> messageStream = env.addSource(...

代码将无法编译,因为 cannot resolve constructor FlinkKafkaConsumer09 ...

pom.xml(重要的部分):

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>

我一直在寻找使用JSON DeserializationSchema的Flink代码,但未能成功。我仅在此处找到了JSONKeyValueDeserializationSchema的单元测试:link 有谁知道正确的方法吗?
谢谢。
2个回答

6

我按照 Vishnu viswanath 的回答进行操作,但是即使是一个简单的 JSON 如 {"name":"John Doe"}JSONKeyValueDeserializationSchema 在 JSON 解析阶段也会引发异常。

抛出异常的代码如下:

DataStream<ObjectNode> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
    , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
    private static final long serialVersionUID = -6867736771747690202L;

    @Override
    public String map(ObjectNode node) throws Exception {
        return "Kafka and Flink says: " + node.get(0);
    }
}).print();

输出:

09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
    at java.lang.Thread.run(Thread.java:745)

我成功使用另一种反序列化模式 JSONDeserializationSchema

        DataStream<ObjectNode> messageStream = env.addSource(
            new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                    , new JSONDeserializationSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode value) throws Exception {
            return "Kafka and Flink says: " + value.get("key").asText();
        }
    }).print();

4

您的错误在于第messageStream.map(new MapFunction<String, String>()行。您定义的mapFunction期望输入类型为String并输出类型为String,但由于您正在使用JSONKeyValueDeserializationSchema将String转换为com.fasterxml.jackson.databind.node.ObjectNode,因此您的MapFunction实际上应该期望输入相同类型的ObjectNode。请尝试以下代码。

messageStream.map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode node) throws Exception {
            return "Kafka and Flink says: " + node.get(0);
        }
    });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接