我有一个非常简单的Apache Flink管道,我想通过Apache Flink仪表板获取一些指标,如Apache Flink文档中所述:
我使用Docker-Hub提供的Docker设置来运行该管道。所有内容都使用Apache Flink 1.10.0。管道可以正常运行,但是当我尝试查看指标时,我只得到以下结果:。现在我在想我做错了什么?我是否缺少某些配置参数或者这不是在仪表板中查看该指标的正确位置?请问有人能够提供建议或者至少指向一个资源,让我能够获得更多信息吗?
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
public class Pipeline {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new RichSourceFunction<String>() {
private static final long serialVersionUID = 3990963645875188158L;
private boolean notCanceled = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (notCanceled) {
ctx.collect("test");
}
}
@Override
public void cancel() {
notCanceled = false;
}
}).map(new RichMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
private transient Counter counter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return "mappedtext";
}
}).print();
env.execute();
}
}
我使用Docker-Hub提供的Docker设置来运行该管道。所有内容都使用Apache Flink 1.10.0。管道可以正常运行,但是当我尝试查看指标时,我只得到以下结果:。现在我在想我做错了什么?我是否缺少某些配置参数或者这不是在仪表板中查看该指标的正确位置?请问有人能够提供建议或者至少指向一个资源,让我能够获得更多信息吗?