在没有本地VPC网络的项目中启动Dataflow作业(2018-07-16_04_25_02-6605099454046602382)时,遇到以下错误 Workflow failed. Causes: Network default is not accessible to Dataflo...
我刚开始接触Google Data Flow,编写了一个从云存储读取CSV文件的简单流程。其中一个步骤涉及调用Web服务来丰富结果。所涉及的Web服务在批量发送几百个请求时表现更佳。 在查看API时,我没有发现将PCollection的100个元素聚合成单个Par.do Execution的...
假设我有一个PCollection<Foo>,我想将其写入多个BigQuery表格,并为每个Foo选择不同的表格。使用Apache Beam的BigQueryIO API,我该如何实现?
我目前正在开发一个ETL数据流作业(使用Apache Beam Python SDK),它从CloudSQL(使用psycopg2和自定义的ParDo)查询数据并将其写入BigQuery。我的目标是创建一个Dataflow模板,我可以使用Cron作业从AppEngine启动。 我有一个本地工...
我想利用新的BigQuery时间分区表功能,但不确定在Dataflow SDK的1.6版本中是否可以实现。查看BigQuery JSON API,要创建一个按天分区的表,需要传入一个"timePartitioning": { "type": "DAY" } 虽然选项很多,但是com.googl...
我们正在构建相当复杂的Dataflow作业,从流式数据源计算模型。特别是,我们有两个模型共享一些指标,并且大致上基于相同的数据源进行计算。这些作业在稍微较大的数据集上执行联接。 你有关于如何设计这种类型作业的任何指南吗?我们需要考虑哪些指标、行为或其他因素来做出决策? 以下是我们考虑的一些...
我们正在尝试在Apache Beam管道上(使用DirectRunner)使用固定窗口。我们的流程如下: 从pub/sub中获取数据 将JSON反序列化为Java对象 使用5秒的固定窗口对事件进行分窗 使用自定义CombineFn,将每个Event窗口组合成一个List<Event&...
Apache Beam 似乎拒绝识别 Kotlin 的 Iterable。以下是示例代码: @ProcessElement fun processElement( @Element input: KV<String, Iterable<String>>, re...
我使用的是zsh,为了通过Mac本地终端与GCP进行交互,我已安装了gcloud。我遇到了这个错误:"zsh:没有找到匹配项:apache-beam [gcp]"。然而,在GCP控制台上直接在bash终端运行该命令时,并没有出现这样的错误。我该怎么处理?谢谢。
使用Python SDK和Dataflow Runner在Apache Beam中从多个文件夹读取文件,然后将文件内容与文件名(filecontents,filename)一起输出到BigQuery。最初的想法是为每个文件创建一个PCollection,然后将文件内容与文件名映射起来。def ...