我有一个使用Pub/Sub订阅作为无界源的数据流处理工作。我想知道在哪个阶段数据流处理会确认收到Pub/Sub消息。如果在数据流处理管道的任何阶段抛出异常,似乎消息就会丢失。 此外,我想知道如何编写最佳实践的数据流处理管道,以便在失败时从Pub/Sub无界源中检索消息。谢谢!
我有一类计算需要使用图形结构来描述。这个图形结构非常复杂,有多个输入、分叉节点和需要其他多个节点结果的节点。在所有计算中,还可能存在多个汇点。但是,这个图不会出现任何环路。输入节点会被更新,值会通过(目前纯粹概念性的)图进行传递。节点保留状态,随着输入的变化而变化,计算必须按照输入的顺序进行。...
我正在使用Google Dataflow的Python Beam,我的管道如下所示: 从文件中读取图像URL >> 下载图像 >> 处理图像 问题在于我不能让“下载图像”步骤根据需要进行扩展,因为我的应用程序可能会被图像服务器阻塞。 是否有一种方法可以限制该步...
我有一个使用案例,需要捕获从一个API到另一个API的数据流。例如,我的代码使用Hibernate从数据库读取数据,在数据处理期间我将一个POJO转换为另一个,并执行一些更多的处理,最后将其转换为最终结果Hibernate对象。简而言之,是像POJO1到POJO2到POJO3这样的东西。 在...
我想要实现类似这样的功能:Beam/Dataflow中的批处理PCollection 以上链接中的答案是用Java编写的,而我使用的语言是Python。因此,我需要一些帮助来获得类似的构造。 具体来说,我有这个: p = beam.Pipeline (options = pipeline_...
我在我的应用程序中使用TPL Dataflow实现了生产者/消费者模式。我有一个大的数据流网格,其中约有40个块。该网格有两个主要的功能部分:生产者部分和消费者部分。当消费者处理一些指定数量的工作项时,生产者应继续为消费者提供大量的工作。否则,应用程序会消耗大量内存/CPU并且行为不可持续。我...
一些背景知识,然后才是真正的问题: 我正在开发一个后端应用程序,由几个不同模块组成。每个模块目前都是一个命令行Java应用程序,可以根据需要“按需”运行(稍后会有更多详细信息)。 每个模块是一个“步骤”,是可以视为数据流的更大过程的一部分;第一步从外部源收集数据文件并将其推送/加载到某些S...
我们使用“系统延迟”来检查我们的Dataflow作业的健康状况。例如,如果我们看到系统延迟增加,我们会尝试找出如何降低此指标。关于这个度量标准有一些问题。 1)什么是系统延迟的确切含义? 答:数据项等待处理的最长时间。 以上是我们在GCP控制台上点击信息图标时看到的内容。在这种情况下,“...
我正在使用数据流中的FlatFile Source Manager,Script COmponent as Trans和OLEDB destination。源从平面文件中读取所有行,我想跳过最后一行(尾记录)并更新数据库。由于它包含NULL值,数据库会抛出错误。请帮助我解决这个问题。 谢谢,...