尝试通过Dataflow访问Google Cloud Datastore时出现403错误

3

我有一个谷歌应用引擎应用程序,数据存储在谷歌云数据存储中。我想使用Dataflow将部分数据放入BigQuery,但我打算从仅获取一些来自Datastore的信息并将其写入Google Cloud Storage开始。我的代码如下:

public class DatastorePipeline {
    private static final Logger LOG = LoggerFactory.getLogger(DatastorePipeline.class);

static class GetEmailFn extends DoFn<Entity, String> {

    @Override
    public void processElement(ProcessContext c) throws Exception {
        Map<String, Value> properties = DatastoreHelper.getPropertyMap(c.element());
        Value value = properties.get("email_address");
        if(value != null) {
            c.output(DatastoreHelper.getString(value));
        }
    }
}

    public static void main(String[] args) {
        Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

        Query.Builder q = Query.newBuilder();
        q.addKindBuilder().setName("User");
        Query query = q.build();

        DatastoreIO.Source source = DatastoreIO.source()
        .withDataset("my-project-id")
        .withQuery(query);

        p.apply("ReadUsersFromDatastore", Read.from(source))
        .apply(ParDo.named("GetEmailAddress").of(new GetEmailFn()))
        .apply(TextIO.Write.to("gs://dataflow-output-bucket/emails.txt"));

        p.run();
    }
}

然而,当我尝试运行这个代码时,我在执行Datastore查询时遇到了403错误:
Request failed with code 403, will NOT retry: https://www.googleapis.com/datastore/v1beta2/datasets/my-project-id/runQuery

我正在使用Google Cloud Dataflow插件从Eclipse运行此程序。在没有Datastore读取的数据流作业中运行正常。我进行了

gcloud auth login

在运行作业之前,需要像教程中描述的那样进行配置。我做错了什么?
编辑:下面是完整的堆栈跟踪。
Oct 11, 2015, 12:03:13 PM (b6119cca307b4d9a): com.google.api.services.datastore.client.DatastoreException: Unauthorized. at com.google.api.services.datastore.client.RemoteRpc.makeException(RemoteRpc.java:115) at com.google.api.services.datastore.client.RemoteRpc.call(RemoteRpc.java:81) at com.google.api.services.datastore.client.BaseDatastoreFactory$RemoteRpc.call(BaseDatastoreFactory.java:41) at com.google.api.services.datastore.client.Datastore.runQuery(Datastore.java:109) at com.google.api.services.datastore.client.QuerySplitterImpl.getScatterKeys(QuerySplitterImpl.java:189) at com.google.api.services.datastore.client.QuerySplitterImpl.getSplits(QuerySplitterImpl.java:75) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.getSplitQueries(DatastoreIO.java:427) at com.google.cloud.dataflow.sdk.io.DatastoreIO$Source.splitIntoBundles(DatastoreIO.java:306) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSplit(BasicSerializableSourceFormat.java:318) at com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.performSourceOperation(BasicSerializableSourceFormat.java:167) at com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor.execute(SourceOperationExecutor.java:80) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:257) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:193) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:146) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:164) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:145) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
答案:问题在于我的项目基于公司的域名限制访问,这是导致服务帐户无法连接的原因。感谢Dan帮助我解决了这个问题!

很抱歉你遇到了这个问题。你能否编辑问题,包括完整的错误堆栈跟踪? - jkff
看起来你正在使用 DataflowPipelineRunner(或者是 Blocking 变体)。是否尝试使用 DirectPipelineRunner 运行,看看是否会出现相同的错误? - Dan Halperin
此外,这个错误是每次都发生,而不是偶尔发生的,对吗? - Dan Halperin
1
这个错误每次都会发生,而且只会在DataflowPipeLineRunner/Blocking runner上出现。在DirectPipelineRunner上不会发生。 - Herbert Lee
1个回答

2
看起来您的Datastore权限配置不正确。
以下是两个通用建议:
  1. 有必要查看Google Cloud Dataflow安全性和权限文档。
  2. Datastore是否在您运行作业的同一项目中创建?
然而,在您的情况下,您遇到了以下错误:
  1. 相关的 AppEngine 项目是否被锁定在特定域名的所有用户中?如果是,那么在当前 Cloud Datastore 的 beta 版本中存在一个问题,防止 Dataflow 服务帐户(电子邮件以 @cloudservices.gserviceaccount.com 结尾)访问数据。

    我们可以应用一个临时解决方案来解决这个问题,如果您正在使用 OAuth API,则会有一些小成本。解决方案将不再强制要求用户来自您应用程序的域。如果这对您来说是一个重要的要求,您可以在代码中进行域强制执行。(常规用户 API 不受影响。)

    要求我们应用临时解决方案,您可以发送电子邮件至 dataflow-feedback@google.com,引用此问题并包括您的数字项目 ID。


1
是的,Datastore是在运行作业的同一项目中创建的。我已经查看了那份文件,但没有看到需要在任何地方设置我的Datastore权限。 - Herbert Lee
1
另外,我不知道这是否有影响,但我没有该项目的所有者权限,只有编辑权限。 - Herbert Lee
由于您不是所有者,我怀疑您遇到了我们之前未处理过的权限角落情况。能否请您发送一封电子邮件至dataflow-feedback@google.com?如有可能,请包括一个失败的作业ID(类似于2015-10-14_09_02_25-12818314609033999211)和您的数字项目编号。 - Dan Halperin
顺便说一下,赫伯特,如果这个答案解决了你的问题,能否请您接受这个答案? - Dan Halperin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接