我想要能够将DataFrame打包到Scala jar文件中,并在R中访问它们。最终目标是创建一种方法,可以在Python、R和Scala中访问特定且经常使用的数据库表,而不需要为每个语言编写一个不同的库。
为了实现这一目标,我在Scala中制作了一个jar文件,其中包含使用SparkSQL库查询数据库并获取我想要的DataFrames的函数。我希望能够在R中调用这些函数,而无需创建另一个JVM,因为Spark已经在R中运行在一个JVM上。但是,Spark使用的JVM在SparkR API中没有暴露出来。为了使其可访问并使Java方法可调用,我修改了SparkR包中的"backend.R"、"generics.R"、"DataFrame.R"和"NAMESPACE":
在"backend.R"中,我将"callJMethod"和"createJObject"定义为正式方法:
setMethod("callJMethod", signature(objId="jobj", methodName="character"), function(objId, methodName, ...) {
stopifnot(class(objId) == "jobj")
if (!isValidJobj(objId)) {
stop("Invalid jobj ", objId$id,
". If SparkR was restarted, Spark operations need to be re-executed.")
}
invokeJava(isStatic = FALSE, objId$id, methodName, ...)
})
setMethod("newJObject", signature(className="character"), function(className, ...) {
invokeJava(isStatic = TRUE, className, methodName = "<init>", ...)
})
我修改了 "generics.R" 文件,添加了这些函数:
#' @rdname callJMethod
#' @export
setGeneric("callJMethod", function(objId, methodName, ...) { standardGeneric("callJMethod")})
#' @rdname newJobject
#' @export
setGeneric("newJObject", function(className, ...) {standardGeneric("newJObject")})
然后我在NAMESPACE文件中为这些函数添加了导出:
export("cacheTable",
"clearCache",
"createDataFrame",
"createExternalTable",
"dropTempTable",
"jsonFile",
"loadDF",
"parquetFile",
"read.df",
"sql",
"table",
"tableNames",
"tables",
"uncacheTable",
"callJMethod",
"newJObject")
这使我能够在不启动新的JVM的情况下调用我编写的Scala函数。
我编写的Scala方法返回DataFrames,在返回时是R中的"jobj",但SparkR DataFrame是一个环境+一个jobj。为了将这些jobj DataFrames转换为SparkR DataFrames,我使用了"DataFrame.R"中的dataFrame()函数,并按照上述步骤使其可访问。
然后,我能够从R中访问我从Scala中构建的DataFrame,并在该DataFrame上使用所有SparkR的函数。我想知道是否有更好的方法来创建这样的跨语言库,或者是否存在让Spark JVM公开的任何原因?