Flink
消费者,消费者将消费第11个数据。因此,我有三个问题:
- 如何获取当前主题的分区数和每个分区的偏移量?
- 如何手动为每个分区设置消费者的起始位置?
- 如果
Flink
消费者崩溃,并且在几分钟后恢复,该如何知道从哪里重新开始消费?
FlinkKafkaConsumer08
、FlinkKafkaConsumer10
但都会出现异常):public class kafkaConsumer {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.95.2:9092");
properties.setProperty("group.id", "test");
properties.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>(
"game_event", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
}).print();
env.execute();
}
}
以及pom.xml文件:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>