ReadFromKafka抛出ValueError: Unsupported signal: 2

3

目前我正在尝试学习Apache Beam和Apache Kafka。

Kafka服务正在运行(本地),并且我使用kafka-console-producer编写了一些测试消息。

首先,我编写了这个Java代码片段,以便使用我熟悉的语言测试Apache Beam。它按预期工作。

public class Main {

  public static void main(String[] args) {

    Pipeline pipeline = Pipeline.create();

    Read<Long, String> kafkaReader = KafkaIO.<Long, String>read()
        .withBootstrapServers("localhost:9092")
        .withTopic("beam-test")
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class);

    kafkaReader.withoutMetadata();

    pipeline
        .apply("Kafka", kafkaReader
        ).apply(
          "Extract words", ParDo.of(new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
          public void processElement(ProcessContext c){
              System.out.println("Key:" + c.element().getKV().getKey() + " | Value: " + c.element().getKV().getValue());
            }
        })
    );

    pipeline.run();
  }
}

我的目标是用Python编写同样的功能,目前我的进展如下:

def run_pipe():
    
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p
        | 'Kafka Unbounded' >> ReadFromKafka(consumer_config={'bootstrap.servers' : 'localhost:9092'}, topics=['beam-test'])
        | 'Test Print' >> beam.Map(print)
        )

if __name__ == '__main__':
    run_pipe()

现在来说问题。当我尝试运行Python代码时,我会得到以下错误:

(app) λ python ArghKafkaExample.py 
Traceback (most recent call last):
  File "ArghKafkaExample.py", line 22, in <module>
    run_pipe()
  File "ArghKafkaExample.py", line 10, in run_pipe
    (p
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 1028, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py", line 572, in __ror__
    result = p.apply(self, pvalueish, label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 648, in apply
    return self.apply(transform, pvalueish)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py", line 691, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 198, in apply
    return m(transform, input, options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py", line 228, in apply_PTransform
    return transform.expand(input)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 322, in expand
    self._expanded_components = self._resolve_artifacts(
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 120, in __exit__
    next(self.gen)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 372, in _service
    yield stub
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py", line 523, in __exit__
    self._service_provider.__exit__(*args)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 74, in __exit__
    self.stop()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 133, in stop
    self.stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 179, in stop_process
    return super(JavaJarServer, self).stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py", line 143, in stop_process
    self._process.send_signal(signal.SIGINT)
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\subprocess.py", line 1434, in send_signal
    raise ValueError("Unsupported signal: {}".format(sig))
ValueError: Unsupported signal: 2

通过谷歌搜索,我发现这与程序退出代码有关(例如Ctrl+C),但是总体而言,我完全不知道问题是什么。

任何建议都会有帮助!

问候 Pascal

1个回答

4
您的管道代码在这里看起来正确。问题是由于Python SDK中Kafka IO的要求所致。根据模块文档
这些转换目前由Beam可移植运行程序(例如可移植Flink和Spark)以及Dataflow运行程序支持。
此模块提供的转换是在Beam Java SDK中实现的跨语言转换。在管道构建期间,Python SDK将连接到Java扩展服务以扩展这些转换。为了方便这一点,在使用这些转换之前需要进行小量的设置,以用于Beam Python管道。
Kafka IO以Java中的跨语言转换形式在Python中实现,您的管道失败是因为尚未设置环境以执行跨语言转换。以通俗易懂的方式解释什么是跨语言转换:这意味着Kafka转换实际上是在Java SDK上执行而不是在Python SDK上执行,因此可以利用Java上现有的Kafka代码。
有两个障碍阻止您的管道正常工作。更容易修复的一个是,只有我引用的这些运行程序支持跨语言转换,因此如果您正在使用直接运行程序运行此管道,则无法正常工作。您需要切换到本地模式的Flink或Spark运行程序。
更为棘手的障碍是,您需要启动扩展服务才能向管道添加外部转换。您收到的堆栈跟踪是由于Beam尝试扩展转换但无法连接到扩展服务而导致扩展失败。
如果您仍然想尝试使用跨语言进行操作,即使需要进行额外设置,我提供的链接文档中包含运行扩展服务的说明。在我编写这篇答案时,此功能仍然很新,并且文档可能有盲点。如果遇到问题,请在Apache Beam用户邮件列表或Apache Beam Slack频道上咨询问题。

你好,Daniel。将默认扩展服务添加为......expansion_service= "localhost:8097"时,会出现以下错误:grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE, details = "failed to connect to all addresses" debug_error_string = "{"created":"@1615306325.725000000","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":5397,"referenced_errors":[{"created":"@1615306325.725000000","description":"failed to connect to all addresses" - Induraj PR
没有其他上下文的情况下,这很可能是一个端口问题。确保您尝试使用的所有端口都是开放的。但如果您需要更详细的帮助,我建议创建一个新的 Stack Overflow 问题,提供更多上下文信息,或者像我上面建议的那样询问邮件列表/Slack 频道。 - Daniel Oliveira
嗨,丹尼尔,请查看这篇帖子,它详细说明了我的错误:https://stackoverflow.com/questions/66560061/apache-beam-readfromkafka-throws-grpc-channel-inactiverpcerror - Induraj PR

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接