Apache Beam - 无法在具有多个输出标签的DoFn上推断编码器

8

我正在尝试使用Apache Beam执行管道,但在尝试放置一些输出标签时出现错误:

import com.google.cloud.Tuple;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;

import java.lang.reflect.Type;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The Transformer.
 */
class Transformer {
    final static TupleTag<Map<String, String>> successfulTransformation = new TupleTag<>();
    final static TupleTag<Tuple<String, String>> failedTransformation = new TupleTag<>();

    /**
     * The entry point of the application.
     *
     * @param args the input arguments
     */
    public static void main(String... args) {
        TransformerOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(TransformerOptions.class);

        Pipeline p = Pipeline.create(options);

        p.apply("Input", PubsubIO
                .readMessagesWithAttributes()
                .withIdAttribute("id")
                .fromTopic(options.getTopicName()))
                .apply(Window.<PubsubMessage>into(FixedWindows
                        .of(Duration.standardSeconds(60))))
                .apply("Transform",
                        ParDo.of(new JsonTransformer())
                                .withOutputTags(successfulTransformation,
                                        TupleTagList.of(failedTransformation)));

        p.run().waitUntilFinish();
    }

    /**
     * Deserialize the input and convert it to a key-value pairs map.
     */
    static class JsonTransformer extends DoFn<PubsubMessage, Map<String, String>> {

        /**
         * Process each element.
         *
         * @param c the processing context
         */
        @ProcessElement
        public void processElement(ProcessContext c) {
            String messagePayload = new String(c.element().getPayload());
            try {
                Type type = new TypeToken<Map<String, String>>() {
                }.getType();
                Gson gson = new Gson();
                Map<String, String> map = gson.fromJson(messagePayload, type);
                c.output(map);
            } catch (Exception e) {
                LOG.error("Failed to process input {} -- adding to dead letter file", c.element(), e);
                String attributes = c.element()
                        .getAttributeMap()
                        .entrySet().stream().map((entry) ->
                                String.format("%s -> %s\n", entry.getKey(), entry.getValue()))
                        .collect(Collectors.joining());
                c.output(failedTransformation, Tuple.of(attributes, messagePayload));
            }

        }
    }
}

显示的错误为:

主线程中的异常 "main" java.lang.IllegalStateException: 无法返回Transform.out1 [PCollection]的默认编码器。请纠正以下根本原因之一:未手动指定编码器;您可以使用.setCoder()来指定。从CoderRegistry推断编码器失败:无法为V提供编码器。使用已注册的CoderProvider构建编码器失败。请参阅抑制的异常以获取详细的失败信息。使用生成PTransform的默认输出编码器失败:无法为V提供编码器。使用已注册的CoderProvider构建编码器失败。

我尝试了不同的方法来解决这个问题,但我认为我并不理解问题所在。我知道这些行导致了错误的发生:
.withOutputTags(successfulTransformation,TupleTagList.of(failedTransformation))

但我不清楚其中的哪个部分,哪个部分需要特定的编码器,以及错误中的"V"是什么意思(来自"无法为V提供编码器")。

为什么会出现这个错误?我也尝试查看Apache Beam的文档,但它们似乎没有解释这样的用法,我也从讨论编码器的部分中没有理解太多。

谢谢

2个回答

22

首先,我建议以下更改:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<>();

转化为这样:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<Map<String, String>>() {};
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<Tuple<String, String>>() {};

这应该有助于编码推断确定边缘输出的类型。此外,您是否已经为Tuple正确注册了CoderProvider


更改在当前阶段似乎没有任何作用。不,我没有为元组注册CoderProvider,因为我不清楚我应该如何做,您能否提供更多信息? - Jac
我用自己创建的一个继承Serializable类的类替换了Tuple,你的答案解决了我的问题。为什么?我所做的和你建议的有什么不同?另外,你有没有猜测为什么Google Cloud的Tuple没有继承Serializable并且不能被继承? - Jac
1
我不熟悉Google Cloud的Tuple。你能提供一下它的来源链接吗?通常,使用Serializable会比使用自定义编码器或类似Avro的东西效率低,因为它使用了Java序列化。请参阅注册编码器的文档。 - Ben Chambers
4
这个问题涉及到Java泛型和擦除。在SDK中,为了推断使用哪个编码器,我们需要确定类型。然而,在类型擦除后,new TupleTag<>()没有类型信息,因此这是不可能的。new TupleTag<String>() {}实际上创建了一个没有泛型参数的匿名子类,这使我们可以通过反射进行技巧操作并确定实际类型为String,然后让我们查找Coder<String> - Ben Chambers
无论如何,Google Cloud的Tuple非常简单:https://googlecloudplatform.github.io/google-cloud-java/0.25.0/apidocs/com/google/cloud/Tuple.html - Jac

2
感谢@Ben Chambers的回答,Kotlin是:
val successTag = object : TupleTag<MyObj>() {}
val deadLetterTag = object : TupleTag<String>() {}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接