9得票2回答
Dataflow / Apache Beam - 当传入模式时如何访问当前文件名?

我之前在stackoverflow上看到过这个问题的答案(https://dev59.com/TIrda4cB1Zd3GeqPLV18),但是自从Apache Beam为Python添加可分割dofn功能后,就没有更新了。当我向gcs存储桶传递文件模式时,如何访问当前正在处理的文件的文件名? ...

9得票1回答
Apache Beam BigQueryIO 写入速度慢

我的 Beam pipeline 在将数据写入未分区的 BigQuery 目标表。PCollection 包含成千上万的 TableRows。如果我使用 DirectRunner 运行,BigQueryIO 显然会首先在 BigQueryWriteTemp 临时文件夹中为每个记录创建一个临时文...

9得票1回答
Apache Beam:DoFn和SimpleFunction有什么区别?

在阅读有关使用Java在Apache Beam中处理流元素的过程中,我遇到了DoFn<InputT,OutputT>,然后又遇到了SimpleFunction<InputT,OutputT>。 这两者对我来说看起来很相似,我发现很难理解它们之间的区别。 可以有人用通...

9得票1回答
Dataflow / Apache Beam 在哪个阶段确认 pub/sub 消息?

我有一个使用Pub/Sub订阅作为无界源的数据流处理工作。我想知道在哪个阶段数据流处理会确认收到Pub/Sub消息。如果在数据流处理管道的任何阶段抛出异常,似乎消息就会丢失。 此外,我想知道如何编写最佳实践的数据流处理管道,以便在失败时从Pub/Sub无界源中检索消息。谢谢!

9得票3回答
Apache Beam:Python SDK中的DoFn.Setup等效项

在Beam Python DoFn中,如何进行昂贵的一次性初始化?Java SDK使用DoFn.Setup,但似乎在Beam Python中没有相应的方法。 目前最好的方式是在DoFn初始化器中将对象附加到threading.local()吗?

9得票3回答
使用Dataflow读取CSV文件头信息

我有一个CSV文件,但我不知道列名。我需要在Google Dataflow中进行一些转换后将数据输出为JSON格式。 那么,最好的方法是什么呢?如何将标题行的标签应用到所有行中? 例如: a,b,c 1,2,3 4,5,6 ...变成(大约): {a:1, b:2, c:3} {a...

9得票1回答
将TensorFlow Transform应用于生产中的特征转换/缩放。

概述 我按照下面的指南编写了TF Records,其中我使用了tf.Transform来预处理我的特征。现在,我想要部署我的模型,为此需要在真实数据上应用这个预处理函数。 我的方法 首先,假设我有2个特征: features = ['amount', 'age'] 我有来自Apac...

9得票5回答
如何在Apache Beam中写入多个文件?

让我简要说明我的情况。我正在使用Apache Beam 0.6.0。我的最终处理结果是PCollection<KV<String,String>>。我想将值写入相应的不同文件中。 例如,假设结果包括 (key1, value1) (key2, value2) (ke...

9得票2回答
Apache Beam 和 CombineFn 对于程序员来说是什么问题?

我们正在使用Apache Beam和DirectRunner作为运行器来构建一个管道。我们目前正在尝试一个简单的管道,其中我们: 从Google Cloud Pub/Sub中提取数据(当前使用仿真器在本地运行) 反序列化为Java对象 使用1分钟的固定窗口对事件进行分窗 使用自定义Comb...

9得票6回答
谷歌数据流上的Apache Beam示例权限错误

我在将Apache Beam示例从本地机器提交到我们的云平台时遇到了麻烦。 通过使用gcloud auth list,我可以看到正确的帐户当前处于活动状态。我可以使用gsutil和Web客户端与文件系统交互。我可以使用云shell通过python REPL运行管道。 但是,当我尝试运行Py...