使用SparkR JVM调用来自Scala jar文件的方法

12

我想要能够将DataFrame打包到Scala jar文件中,并在R中访问它们。最终目标是创建一种方法,可以在Python、R和Scala中访问特定且经常使用的数据库表,而不需要为每个语言编写一个不同的库。

为了实现这一目标,我在Scala中制作了一个jar文件,其中包含使用SparkSQL库查询数据库并获取我想要的DataFrames的函数。我希望能够在R中调用这些函数,而无需创建另一个JVM,因为Spark已经在R中运行在一个JVM上。但是,Spark使用的JVM在SparkR API中没有暴露出来。为了使其可访问并使Java方法可调用,我修改了SparkR包中的"backend.R"、"generics.R"、"DataFrame.R"和"NAMESPACE":

在"backend.R"中,我将"callJMethod"和"createJObject"定义为正式方法:

  setMethod("callJMethod", signature(objId="jobj", methodName="character"), function(objId, methodName, ...) {
  stopifnot(class(objId) == "jobj")
  if (!isValidJobj(objId)) {
    stop("Invalid jobj ", objId$id,
         ". If SparkR was restarted, Spark operations need to be re-executed.")
  }
  invokeJava(isStatic = FALSE, objId$id, methodName, ...)
})


  setMethod("newJObject", signature(className="character"), function(className, ...) {
  invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
})

我修改了 "generics.R" 文件,添加了这些函数:

#' @rdname callJMethod
#' @export
setGeneric("callJMethod", function(objId, methodName, ...) { standardGeneric("callJMethod")})

#' @rdname newJobject
#' @export
setGeneric("newJObject", function(className, ...) {standardGeneric("newJObject")})

然后我在NAMESPACE文件中为这些函数添加了导出:

export("cacheTable",
   "clearCache",
   "createDataFrame",
   "createExternalTable",
   "dropTempTable",
   "jsonFile",
   "loadDF",
   "parquetFile",
   "read.df",
   "sql",
   "table",
   "tableNames",
   "tables",
   "uncacheTable",
   "callJMethod",
   "newJObject")

这使我能够在不启动新的JVM的情况下调用我编写的Scala函数。

我编写的Scala方法返回DataFrames,在返回时是R中的"jobj",但SparkR DataFrame是一个环境+一个jobj。为了将这些jobj DataFrames转换为SparkR DataFrames,我使用了"DataFrame.R"中的dataFrame()函数,并按照上述步骤使其可访问。

然后,我能够从R中访问我从Scala中构建的DataFrame,并在该DataFrame上使用所有SparkR的函数。我想知道是否有更好的方法来创建这样的跨语言库,或者是否存在让Spark JVM公开的任何原因?

1个回答

4

Spark JVM是否应该公开?

可能有不止一个原因。Spark开发人员努力提供稳定的公共API。实现的低级细节,包括客户语言与JVM通信的方式,根本不是合同的一部分。它可以在任何时候完全重写,而不会对用户产生任何负面影响。如果您决定使用它并且存在向后不兼容的更改,则需要自己解决。

保持内部私有化减少了维护和支持软件的工作量。您只需不必自己烦恼用户可能滥用这些功能的所有可能方式。

制作跨语言库的更好方法

如果不了解您的用例,很难说。我至少看到三个选项:

  • 首先,R语言提供的访问控制机制较弱。如果API的任何部分是内部的,您总可以使用:::函数来访问它。就像聪明人所说:

    在您的代码中使用:::通常是一种设计错误,因为相应的对象可能已经被保留为内部对象,有着很好的理由。

    但有一点可以肯定,这比修改Spark源代码要好得多。作为一个额外的奖励,它清楚地标记了您的代码中特别脆弱和潜在不稳定的部分。

  • if all you want is to create DataFrames the simplest thing is to use raw SQL. It is clean, portable, requires no compilation, packaging and simply works. Assuming you have query string like below stored in the variable named q

    CREATE TEMPORARY TABLE foo
    USING org.apache.spark.sql.jdbc
    OPTIONS (
        url "jdbc:postgresql://localhost/test",
        dbtable "public.foo",
        driver "org.postgresql.Driver"
    )
    

    it can be used in R:

    sql(sqlContext, q)
    fooDF <- sql(sqlContext, "SELECT * FROM foo")
    

    Python:

    sqlContext.sql(q)
    fooDF = sqlContext.sql("SELECT * FROM foo")
    

    Scala:

    sqlContext.sql(q)
    val fooDF = sqlContext.sql("SELECT * FROM foo")
    

    or directly in Spark SQL.

  • finally you can use Spark Data Sources API for consistent and supported cross-platform access.

在这三种选择中,我更喜欢原始的SQL,其次是数据源API用于复杂情况,最后才考虑使用内部机制。

编辑 (2016-08-04):

如果你对JVM的底层访问感兴趣,那么相对较新的rstudio/sparkapi包可以暴露出SparkR RPC协议的内部细节。很难预测它将如何发展,因此请自行决定是否使用。


但是有没有一种好的方法可以在R和Scala之间共享DataFrames,而不必将数据存储并重新读取?除非我漏掉了什么,否则解决方案2和3似乎都需要这样做。 - shj
我不确定你想要什么。Spark不支持每个JVM多个上下文(参见SPARK-2243)也不支持在上下文之间共享RDD。因此,所有三种情况都需要从某种类型的存储中读取数据。有一些选择,例如spark-jobserver,在Ignite之上的“共享”RDD(https://ignite.apache.org/features/igniterdd.html)或使用[Tachyon](http://tachyon-project.org/)作为内存存储层来尝试解决这个问题,但简单地暴露JVM是行不通的。 - zero323
对于简单的 SQL 查询,您可以使用内置的 Thrift 服务器 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接