Spark从Postgres JDBC表读取速度慢

7

我正在尝试从PostgreSQL数据库中加载约1M行到Spark。使用Spark需要大约10秒钟。但是,使用psycopg2驱动程序加载相同的查询只需要2秒钟。我正在使用postgresql jdbc驱动程序版本42.0.0

def _loadFromPostGres(name):
    url_connect = "jdbc:postgresql:"+dbname
    properties = {"user": "postgres", "password": "postgres"}
    df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
    return df

df = _loadFromPostGres("""
    (SELECT "seriesId", "companyId", "userId", "score" 
    FROM user_series_game 
    WHERE "companyId"=655124304077004298) as
user_series_game""")

print measure(lambda : len(df.collect()))

输出结果是 -
--- 10.7214591503 seconds ---
1076131

使用 psycopg2 -

import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()

def _exec():
    cur.execute("""(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298)""")
    return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()

输出结果为 -
--- 2.27961301804 seconds ---
1076131

度量函数 -
def measure(func) :
    start_time = time.time()
    x = func()
    print("--- %s seconds ---" % (time.time() - start_time))
    return x

请帮我找出这个问题的原因。


编辑1

我进行了更多的基准测试。使用Scala和JDBC -

import java.sql._;
import scala.collection.mutable.ArrayBuffer;

def exec() {

val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

val conn = DriverManager.getConnection(url,"postgres","postgres");

val sqlText = """SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298"""

val t0 = System.nanoTime()

val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

val rs = stmt.executeQuery()

val list = new ArrayBuffer[(Long, Long, Long, Double)]()

while (rs.next()) {
    val seriesId = rs.getLong("seriesId")
    val companyId = rs.getLong("companyId")
    val userId = rs.getLong("userId")
    val score = rs.getDouble("score")
    list.append((seriesId, companyId, userId, score))
}

val t1 = System.nanoTime()

println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

println(list.size)

rs.close()
stmt.close()
conn.close()
}

exec()

输出结果为 -
Elapsed time: 1.922102285s
1143402

当我在Spark+Scala中使用collect()函数时 -
import org.apache.spark.sql.SparkSession

def exec2() {

    val spark = SparkSession.builder().getOrCreate()

    val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+ 
    "?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")

    val sqlText = """(SELECT "seriesId", "companyId", "userId", "score" 
        FROM user_series_game 
        WHERE "companyId"=655124304077004298) as user_series_game"""

    val t0 = System.nanoTime()

    val df = spark.read
          .format("jdbc")
          .option("url", url)
          .option("dbtable", sqlText)
          .option("user", "postgres")
          .option("password", "postgres")
          .load()

    val list = df.collect()

    val t1 = System.nanoTime()

    println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")

    print (list.size)
}

exec2()

输出结果为:
Elapsed time: 1.486141076s
1143445

因此,Python序列化会花费4倍的额外时间。我知道会有一些代价,但这似乎太多了。

1个回答

4
这个原因非常简单,有两个同时存在的原因。
首先我会给你一个关于 `psycopg2` 如何工作的视角。
这个库 `psycopg2` 的工作方式与连接到 RDMS 的任何其他库相同。该库将查询发送到您的 Postgres 引擎,然后将返回数据给您。就这么直接了当。
连接 -> 查询 -> 返回数据 -> 获取数据
当您使用 Spark 时,有两种略微不同的方式。Spark 不像运行在单个线程中的编程语言那样工作。它有一个分布式系统来工作。即使您在本地机器上运行,也是如此。请注意,Spark 有一个基本概念:驱动程序(主节点)和工作节点。
驱动程序接收要执行查询的请求,驱动程序不会为每个工作节点请求信息,而是从您的 Postgres 中请求信息。
如果您查看此处的文档,您会看到以下注释:
不要在大型集群上并行创建太多分区;否则,Spark 可能会崩溃您的外部数据库系统。
这个注释意味着每个工作节点都负责请求您的 postgres 数据。这是启动此过程的一个小开销,但真正的大问题是将数据发送到每个工作节点。
第二点,您在此代码部分中的收集:
print measure(lambda : len(df.collect()))

收集函数会向所有工作线程发送命令,以将数据发送到驱动程序。为了将其存储在驱动程序的内存中,它类似于Reduce,它创建了一个过程中间的Shuffle。Shuffle是数据发送到其他工作线程的步骤。在收集的情况下,每个工作线程都会将其发送到驱动程序。
因此,Spark在您的代码的JDBC中的步骤是:
(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver)请求数据 -> (Workers)Shuffle -> (Driver)Collect
还有一些其他的Spark操作,例如QueryPlan、DataFrame构建和其他操作。
这就是您在Python的简单代码中比使用Spark更快的原因。

我们在从PostgreSQL加载数据到Spark时遇到了重大问题。基本上,我们的想法是将所有数据加载到驱动程序中的Pandas DataFrame中,然后将其转换为Spark DataFrame,然后运行Spark分布式。你有什么建议吗? - Sandeep
请勿将所有数据加载到Pandas中,这样做不好。如果您有一个Spark集群,应该使用Python的JDBC工具从postgres加载数据,直接将数据加载到workers中。https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.jdbc - Thiago Baldim

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接