16得票1回答
使用Google Pub/Sub更新单例HashMap

我有一个使用情况,需要初始化一个HashMap,其中包含一组查找数据(关于IoT设备的物理位置等信息)。这些查找数据作为第二个数据集的参考数据,该数据集是PCollection。这个PCollection是一个数据流,提供IoT设备记录的数据。来自IoT设备的数据流使用作为Google Dat...

7得票2回答
缓冲和刷新Apache Beam流数据

我有一个流式作业,初始运行时需要处理大量数据。其中一个DoFn调用远程服务支持批量请求,因此在处理有界集合时,我使用以下方法: private static final class Function extends DoFn<String, Void> implements ...

9得票1回答
gcloud auth activate-service-account --key-file和GOOGLE_APPLICATION_CREDENTIALS有什么区别?

我正在创建一个shell脚本来处理一些工作流程的自动化,其中包括通过Apache Beam GCP访问Google Buckets。我正在使用包含我的服务帐户信息的.json文件,在什么情况下需要使用: gcloud auth activate-service-account --key-...

9得票1回答
将TensorFlow Transform应用于生产中的特征转换/缩放。

概述 我按照下面的指南编写了TF Records,其中我使用了tf.Transform来预处理我的特征。现在,我想要部署我的模型,为此需要在真实数据上应用这个预处理函数。 我的方法 首先,假设我有2个特征: features = ['amount', 'age'] 我有来自Apac...

8得票1回答
如何在Apache Beam中计算标准差

我是一名新手Apache Beam的用户,并希望能够计算大型数据集的平均数和标准差。 给定一个以"A,B"形式的csv文件,其中A、B为整数,这基本上就是我的需求。 import apache_beam as beam from apache_beam.options.pipeline_o...

7得票2回答
如何使用Python从Pub/Sub到GCS创建一个Dataflow数据管道

我希望使用Dataflow将数据从Pub/Sub移动到GCS。基本上,我希望Dataflow在固定时间内(例如15分钟)累积一些消息,然后在经过该时间后将这些数据作为文本文件写入GCS。 我的最终目标是创建一个自定义管道,因此“Pub/Sub to Cloud Storage”模板对我来说不...

7得票2回答
在Apache Beam中监控与某个文件模式匹配的新文件

我有一个目录存储在GCS或其他支持的文件系统中,外部进程会向其中写入新文件。 我想编写一个Apache Beam流水线,持续监视该目录以获取新文件,并在其到达时读取和处理每个新文件。这是否可行?

12得票1回答
import apache_beam的metaclass冲突

当我尝试导入Apache Beam时,我会遇到以下错误。>>> import apache_beam Traceback (most recent call last): File "<stdin>", line 1, in <module> ...

21得票4回答
使用Dataflow和Cloud Composer的区别

我希望能够澄清一下,是 Cloud Dataflow 还是 Cloud Composer 更适合这项工作,但我在 Google 文档中并没有得到明确的答案。 目前,我正在使用 Cloud Dataflow 读取一个非标准的 csv 文件——做一些基本处理——然后将其加载到 BigQuery ...

13得票3回答
如何使用Beam读取大型CSV文件?

我正在尝试使用Apache Beam来读取大型CSV文件。所谓“大型”是指几个GB的文件(一次性将整个CSV文件读入内存则不切实际)。 到目前为止,我尝试了以下选项: - 使用TextIO.read():这样做是不行的,因为带引号的CSV字段可能包含换行符。此外,这会尝试一次性将整个文件读...