使用Pyspark脚本将BigQuery中的表加载到Spark集群

4
我有一个数据表在bigquery中加载,我想通过一个pyspark .py文件将其导入到我的spark集群中。
我在Dataproc + BigQuery examples - any available?中看到了一种用scala将bigquery表加载到spark集群的方法,但是是否有一种方法可以在pyspark脚本中完成?
1个回答

5

这是来自@MattJ在这个问题中的内容。下面是一个在Spark中连接到BigQuery并执行单词统计的例子。

import json
import pyspark
sc = pyspark.SparkContext()

hadoopConf=sc._jsc.hadoopConfiguration()
hadoopConf.get("fs.gs.system.bucket")

conf = {"mapred.bq.project.id": "<project_id>", "mapred.bq.gcs.bucket": "<bucket>",
    "mapred.bq.input.project.id": "publicdata", 
    "mapred.bq.input.dataset.id":"samples", 
    "mapred.bq.input.table.id": "shakespeare"  }

tableData = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable", "com.google.gson.JsonObject", 
    conf=conf).map(lambda k: json.loads(k[1])).map(lambda x: (x["word"],
    int(x["word_count"]))).reduceByKey(lambda x,y: x+y)

print tableData.take(10)

您需要更改<project_id><bucket>以匹配您项目的设置。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接