我正在尝试使用Apache Beam执行管道,但在尝试放置一些输出标签时出现错误:
import com.google.cloud.Tuple;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.stream.Collectors;
/**
* The Transformer.
*/
class Transformer {
final static TupleTag<Map<String, String>> successfulTransformation = new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation = new TupleTag<>();
/**
* The entry point of the application.
*
* @param args the input arguments
*/
public static void main(String... args) {
TransformerOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TransformerOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Input", PubsubIO
.readMessagesWithAttributes()
.withIdAttribute("id")
.fromTopic(options.getTopicName()))
.apply(Window.<PubsubMessage>into(FixedWindows
.of(Duration.standardSeconds(60))))
.apply("Transform",
ParDo.of(new JsonTransformer())
.withOutputTags(successfulTransformation,
TupleTagList.of(failedTransformation)));
p.run().waitUntilFinish();
}
/**
* Deserialize the input and convert it to a key-value pairs map.
*/
static class JsonTransformer extends DoFn<PubsubMessage, Map<String, String>> {
/**
* Process each element.
*
* @param c the processing context
*/
@ProcessElement
public void processElement(ProcessContext c) {
String messagePayload = new String(c.element().getPayload());
try {
Type type = new TypeToken<Map<String, String>>() {
}.getType();
Gson gson = new Gson();
Map<String, String> map = gson.fromJson(messagePayload, type);
c.output(map);
} catch (Exception e) {
LOG.error("Failed to process input {} -- adding to dead letter file", c.element(), e);
String attributes = c.element()
.getAttributeMap()
.entrySet().stream().map((entry) ->
String.format("%s -> %s\n", entry.getKey(), entry.getValue()))
.collect(Collectors.joining());
c.output(failedTransformation, Tuple.of(attributes, messagePayload));
}
}
}
}
显示的错误为:
我尝试了不同的方法来解决这个问题,但我认为我并不理解问题所在。我知道这些行导致了错误的发生:主线程中的异常 "main" java.lang.IllegalStateException: 无法返回Transform.out1 [PCollection]的默认编码器。请纠正以下根本原因之一:未手动指定编码器;您可以使用.setCoder()来指定。从CoderRegistry推断编码器失败:无法为V提供编码器。使用已注册的CoderProvider构建编码器失败。请参阅抑制的异常以获取详细的失败信息。使用生成PTransform的默认输出编码器失败:无法为V提供编码器。使用已注册的CoderProvider构建编码器失败。
.withOutputTags(successfulTransformation,TupleTagList.of(failedTransformation))
但我不清楚其中的哪个部分,哪个部分需要特定的编码器,以及错误中的"V"是什么意思(来自"无法为V提供编码器")。
为什么会出现这个错误?我也尝试查看Apache Beam的文档,但它们似乎没有解释这样的用法,我也从讨论编码器的部分中没有理解太多。
谢谢
Tuple
,你的答案解决了我的问题。为什么?我所做的和你建议的有什么不同?另外,你有没有猜测为什么Google Cloud的Tuple
没有继承Serializable
并且不能被继承? - Jacnew TupleTag<>()
没有类型信息,因此这是不可能的。new TupleTag<String>() {}
实际上创建了一个没有泛型参数的匿名子类,这使我们可以通过反射进行技巧操作并确定实际类型为String
,然后让我们查找Coder<String>
。 - Ben ChambersTuple
非常简单:https://googlecloudplatform.github.io/google-cloud-java/0.25.0/apidocs/com/google/cloud/Tuple.html - Jac