限制Beam应用程序中的一步骤

8
我正在使用Google Dataflow的Python Beam,我的管道如下所示:

从文件中读取图像URL >> 下载图像 >> 处理图像

问题在于我不能让“下载图像”步骤根据需要进行扩展,因为我的应用程序可能会被图像服务器阻塞。
是否有一种方法可以限制该步骤?无论是每分钟输入还是输出都可以。
谢谢。

这是一个有趣的问题。我会在周一之前尝试回答;) - Pablo
@Xitrum 你有解决这个问题吗,还是使用了下面的建议? - Spencer McCreary
@Pablo 你最终解决了这个问题吗? - Frederik Bode
2个回答

3

一种可能性,也许有点天真,是在步骤中引入睡眠。为此,您需要知道可以同时运行的 ParDo 实例的最大数量。如果 autoscalingAlgorithm 设置为 NONE,则可以从 numWorkersworkerMachineType(DataflowPipelineOptions)获取该值。准确地说,有效速率将被总线程数除以: desired_rate/(num_workers*num_threads(per worker))。睡眠时间将是该有效速率的倒数:

Integer desired_rate = 1; // QPS limit

if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }

if (options.getWorkerMachineType() != null) { 
    machine_type = options.getWorkerMachineType();
    num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }

Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);

然后你可以在限制的函数内使用 TimeUnit.SECONDS.sleep(sleep_time.intValue()); 或等效的方法。在我的例子中,我想从公共文件中读取,解析出空行,并以最大1 QPS的速率调用自然语言处理API(我之前将desired_rate初始化为1):

p
    .apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
    .apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
    .apply("NLP requests", ParDo.of(new ThrottledFn()))
    .apply("Write Lines", TextIO.write().to(options.getOutput()));

速率受限的Fn是ThrottledFn,请注意sleep函数:
static class ThrottledFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        // Instantiates a client
        try (LanguageServiceClient language = LanguageServiceClient.create()) {

          // The text to analyze
          String text = c.element();
          Document doc = Document.newBuilder()
              .setContent(text).setType(Type.PLAIN_TEXT).build();

          // Detects the sentiment of the text
          Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();                 
          String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());

          TimeUnit.SECONDS.sleep(sleep_time.intValue());

          Log.info(nlp_results);
          c.output(nlp_results);
        }
    }
}

使用此方法,如下图所示,我可以获得1个元素/秒的速率,并避免在使用多个工作者时达到配额限制,即使请求不是真正分散的(您可能会同时收到8个请求,然后休眠8秒等)。这只是一个测试,可能更好的实现方式是使用Guava的rateLimiter

enter image description here

如果管道正在使用自动缩放(THROUGHPUT_BASED),则会更加复杂,应该更新工作人员的数量(例如,Stackdriver Monitoring 具有 job/current_num_vcpus 指标)。其他一般考虑因素包括使用虚拟 GroupByKey 控制并行 ParDo 数量或使用 splitIntoBundles 分割源等。我希望看到是否有其他更好的解决方案。

只是想提一下,今晚我尝试使用Guava的速率限制器来完成这个任务(使用Spotify为scio core发布的代码)。它运行得相当不错,除了在日志中看到一些警告消息,指出某个步骤长时间没有输出(这是有道理的)。我不确定那是否会引起问题。有关详细信息,请参见https://stackoverflow.com/questions/69082498/how-does-pubsubio-in-an-apache-beam-streaming-pipeline-know-not-to-pull-more-m。 - Matt Welke

0

可能最简单的方法是(暂时)减少作业的并行度。您可以使用beam.Reshuffle转换来实现。在您的示例中,它应该是这样的:

    | "Read image urls from file" >> beam.Map(read_function)
    | "Reshuffle to throttle API calls" >> beam.Reshuffle(num_buckets=3)
    | "Download images" >> beam.Map(your_download_function)
    | "Process images"  >> beam.Map(your_process_function)

num_buckets 传递的参数应该等于您可以承担的并发 API 调用次数。

您可能需要在“下载图像”步骤之后添加另一个 Reshuffle(使用更高的 num_buckets 值)来恢复作业的并行性并打开您通过第一个 Reshuffle 创建的瓶颈。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接