18得票4回答
谷歌云数据流作业最简单的调度方式

我只需要每天运行一个数据流管道,但建议的解决方案(如需要构建整个 Web 应用程序的 App Engine Cron 服务)似乎有些过于复杂了。 我在考虑是否只需要从 Compute Engine Linux VM 的 cron 工作中运行该管道,但也许这太简单了 :)。那么这种方法有什么问题...

17得票2回答
使用DoFn在Cloud Dataflow中从PubSub将数据写入Google Cloud Storage

我正在尝试使用Google Cloud Dataflow将Google PubSub消息写入Google Cloud Storage。我知道TextIO/AvroIO不支持流水线,但是我在[1]中读到作者的评论中提到可以使用ParDo/DoFn在流水线中向GCS写入数据。我尽可能地按照他们的文...

17得票1回答
Apache Beam/Dataflow Reshuffle

org.apache.beam.sdk.transforms.Reshuffle的目的是什么?文档中定义了以下目的: 一种PTransform,返回与其输入等效的PCollection,但在操作上提供了一些GroupByKey的副作用,特别是防止周围转换的融合、通过id进行检查点和去重...

16得票1回答
PubsubIO在GCD上运行时的水印启发式算法是什么?

嗨,我正在尝试运行一个流水线,其中我正在计算发布到pubsub的消息之间的差异,这些消息以30秒心跳(10K个流,每个心跳每30秒)发布。我不关心100%的数据完整性,但我想了解PubsubIO的水印启发式是什么(以及是否可以调整它),以确定是否可以忽略具有足够低丢失率的晚期数据。 注:如果...

14得票2回答
谷歌数据流式管道在窗口化之后没有将工作负载分配到多个工作器上

我正在尝试在Python中设置数据流式处理管道。我有很多批处理管道的经验。我们的基本架构看起来像这样: 第一步是进行一些基本处理,每条消息需要大约2秒钟才能到达窗口。我们使用3秒的滑动窗口和3秒的间隔(稍后可能会更改,以便我们具有重叠的窗口)。作为最后一步,我们有SOG预测,需要大约15秒...

13得票1回答
Apache Beam在Dataflow中的大型侧边输入

这与这个问题最相似。 我正在Dataflow 2.x中创建一个流水线,该流水线从Pubsub队列接收流输入。每个消息都需要通过来自Google BigQuery的非常大的数据集进行流式处理,并将所有相关值(基于键)附加到它之前写入数据库。 问题在于BigQuery的映射数据集非常大-任何尝...

13得票1回答
如何在没有列名或模式的情况下将CSV文件导入BigQuery表格?

我正在编写一个Java工具,将GCS中的几个CSV文件导入到BigQuery中。我可以通过bq load轻松实现此目标,但我想使用Dataflow作业来完成它。因此,我正在使用Dataflow的Pipeline和ParDo转换器(返回TableRow以应用于BigQueryIO),并创建了St...

13得票1回答
如何在Dataflow/Beam中将流数据与大型历史数据集相结合

我正在使用Google Dataflow/Apache Beam处理Web用户会话日志,并需要将用户的日志(实时数据流)与上个月的用户会话历史记录结合起来。 我已经研究了以下方法: 使用30天固定窗口:窗口太大,不能全部装入内存,并且我不需要更新用户的历史记录,只需引用它。 使用CoGr...

13得票1回答
Dataflow何时确认来自PubSubIO的批处理消息中的项目?

这个主题有一个问题,答案说:“在消息在Dataflow流水线的某个地方持久化之后,确认将被执行。”这在概念上是有道理的,但是我不确定在消息被反序列化和转换为持久负载之前,Dataflow如何跟踪一条消息。 在我们的情况下,PubSub消息包含一批项目。接收并反序列化消息后,我们会将批处理拆分...

13得票1回答
使用SSH密钥在Dataflow工作节点上拉取私有库

我正在设置一个数据流作业,这个作业需要工作人员访问私有的Bitbucket存储库以安装处理数据的库。为了授予数据流工作者访问权限,我已经设置了一对SSH密钥(公钥和私钥)。我设法将私钥传输到我的数据流工作者上。当试图通过git+ssh安装软件包时,我遇到了一个错误Host key verifi...