如何将流式数据框架写入PostgreSQL?

4
我有一个流式数据框,想将其写入数据库。有关如何将RDD或DF写入Postgres的文档,但我找不到在Structured Streaming中执行此操作的示例或文档。
我已阅读文档https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch,但我无法理解在哪里创建JDBC连接以及如何将其写入数据库。
def foreach_batch_function(df, epoch_id):
    # what goes in here?
    pass

view_counts_query = windowed_view_counts.writeStream \
    .outputMode("append") \
    .foreachBatch(foreach_batch_function)
    .option("truncate", "false") \
    .trigger(processingTime="5 seconds") \
    .start() \
    .awaitTermination()

这个函数接受一个普通的数据帧,并将其写入一个postgres表中。

def postgres_sink(config, data_frame):
    config.read('/src/config/config.ini')
    dbname = config.get('dbauth', 'dbname')
    dbuser = config.get('dbauth', 'user')
    dbpass = config.get('dbauth', 'password')
    dbhost = config.get('dbauth', 'host')
    dbport = config.get('dbauth', 'port')

    url = "jdbc:postgresql://"+dbhost+":"+dbport+"/"+dbname
    properties = {
        "driver": "org.postgresql.Driver",
        "user": dbuser,
        "password": dbpass
    }

    data_frame.write.jdbc(url=url, table="metrics", mode="append",
                          properties=properties)
2个回答

5

使用结构化流进行Postgres摄取的示例

class PostgreSqlSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
  val driver = "org.postgresql.Driver"
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  val v_sql = "insert INTO Table (A,B,C) values ( ?, ?, ?)"

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = java.sql.DriverManager.getConnection(url, user, pwd)
    connection.setAutoCommit(false)
    statement = connection.prepareStatement(v_sql)
    true
  }
  def process(value: org.apache.spark.sql.Row): Unit = {
    // ignoring value(0) as this is address
    statement.setString(1, value(1).toString)
    statement.setString(2, value(2).toString)
    statement.setString(3, value(3).toString)
    statement.executeUpdate()        
  }
  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

val url = "jdbc:postgresql://XX.XX.XX.XX:5432/postgres"
val user = "abc"
val pw = "abc@123"
val jdbcWriter = new PostgreSqlSink(url,user,pw)
val writeData = pg_df.writeStream 
    .foreach(jdbcWriter)
    .outputMode("Append")
    .trigger(ProcessingTime("30 seconds"))
    .option("checkpointLocation", "s3a://bucket/check")
    .start()

writeData.awaitTermination

OP正在使用Pyspark。 - Hrvoje
如何处理无法访问Postgres数据库的情况?例如:数据库故障转移。 - Don Srinath

4

在这里真的没有太多需要做的事情,除了你已经做过的。 foreachBatch接受一个函数(DataFrame, Int) => None,所以你只需要一个小适配器,其他所有内容都应该能正常工作:

def foreach_batch_for_config(config)
    def _(df, epoch_id):
        postgres_sink(config, df)
   return _

view_counts_query = (windowed_view_counts
    .writeStream
    .outputMode("append") 
    .foreachBatch(foreach_batch_for_config(some_config))
    ...,
    .start()
    .awaitTermination())

说实话,从一开始就传递 ConfigParser 是一个奇怪的想法。你可以调整签名并在原地初始化它。

def postgres_sink(data_frame, batch_id):
    config = configparser.ConfigParser()
    ...
    data_frame.write.jdbc(...)

并保留其余部分不变。这样,您就可以直接使用您的函数:

...
.foreachBatch(postgres_sink)
...

如何处理数据库故障转移? - Don Srinath

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接