有没有可能在Spark应用程序中使用框架来启用/使用依赖注入?
例如,是否可以使用Guice?
如果可以的话,是否有文档或示例说明如何实现?
我正在使用Scala作为实现语言,Spark 2.2和SBT作为构建工具。
目前,我的团队和我正在使用Cake Pattern - 然而,它已经变得非常冗长,我们更喜欢Guice。那是一些更直观的东西,其他团队成员已经知道了。
有没有可能在Spark应用程序中使用框架来启用/使用依赖注入?
例如,是否可以使用Guice?
如果可以的话,是否有文档或示例说明如何实现?
我正在使用Scala作为实现语言,Spark 2.2和SBT作为构建工具。
目前,我的团队和我正在使用Cake Pattern - 然而,它已经变得非常冗长,我们更喜欢Guice。那是一些更直观的东西,其他团队成员已经知道了。
Neutrino框架正是符合您的需求。
免责声明:我是Neutrino框架的作者。
它是一个基于Guice的依赖注入框架,专为Apache Spark设计,旨在减轻开发中的序列化工作。更具体地说,在对象传输和检查点恢复过程中,它将自动处理DI生成的对象的序列化/反序列化工作。
这里有一个简单的示例(根据Redis数据过滤事件流):
trait EventFilter[T] {
def filter(t: T): Boolean
}
// The RedisEventFilter class depends on JedisCommands directly,
// and doesn't extend `java.io.Serializable` interface.
class RedisEventFilter @Inject()(jedis: JedisCommands)
extends EventFilter[ClickEvent] {
override def filter(e: ClickEvent): Boolean = {
// filter logic based on redis
}
}
/* create injector */
val injector = ...
val eventFilter = injector.instance[EventFilter[ClickEvent]]
val eventStream: DStream[ClickEvent] = ...
eventStream.filter(e => eventFilter.filter(e))
以下是如何配置绑定:
class FilterModule(redisConfig: RedisConfig) extends SparkModule {
override def configure(): Unit = {
// the magic is here
// The method `withSerializableProxy` will generate a proxy
// extending `EventFilter` and `java.io.Serializable` interfaces with Scala macro.
// The module must extend `SparkModule` or `SparkPrivateModule` to get it
bind[EventFilter[ClickEvent]].withSerializableProxy
.to[RedisEventFilter].in[SingletonScope]
}
}
使用中微子,RedisEventFilter
甚至不需要关心序列化问题。一切都像在单个JVM中一样正常工作。
我们知道,为了采用DI框架,我们需要首先构建一个依赖图,描述各种类型之间的依赖关系。Guice使用Module API构建图形,而Spring框架使用XML文件或注释。 中微子基于Guice框架构建,并且当然使用guice模块API构建依赖图。它不仅将图形保留在驱动程序中,还在每个执行器上运行相同的图形。
EventFilter
。代理实例保存了图中的节点ID,该ID将被传递给执行器,以便在图中查找节点并相应地重新创建实例及其所有依赖项。
最近我一直在苦恼同样的问题。我的大部分发现是你会遇到序列化问题。
我在这里找到了一个不错的Guice解决方案: https://www.slideshare.net/databricks/dependency-injection-in-apache-spark-applications
Spring Boot 提供了与各种系统的集成,包括 Spark、Hadoop、YARN、Kafka 和 JDBC 数据库。
例如,我有这个 application.properties
spring.main.web-environment=false
appName=spring-spark
sparkHome=/Users/username/Applications/spark-2.2.1-bin-hadoop2.7
masterUri=local
这是一个应用程序类
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.env.Environment;
@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfig {
@Autowired
private Environment env;
@Value("${appName:Spark Example}")
private String appName;
@Value("${sparkHome}")
private String sparkHome;
@Value("${masterUri:local}")
private String masterUri;
@Bean
public SparkConf sparkConf() {
return new SparkConf()
.setAppName(appName)
.setSparkHome(sparkHome)
.setMaster(masterUri);
}
@Bean
public JavaSparkContext javaSparkContext() {
return new JavaSparkContext(sparkConf());
}
@Bean
public SparkSession sparkSession() {
SparkSession.Builder sparkBuilder = SparkSession.builder()
.appName(appName)
.master(masterUri)
.sparkContext(javaSparkContext().sc());
return sparkBuilder.getOrCreate();
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}
taskContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--List out all tasks here-->
<bean id="exampleSparkTask" class="com.example.spark.task.SampleSparkTask">
<constructor-arg ref="sparkSession" />
</bean>
</beans>
App
@SpringBootApplication
@ImportResource("classpath:taskContext.xml")
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
这里实际上是运行Spark的Scala代码
@Order(1)
class SampleSparkTask(sparkSession: SparkSession) extends ApplicationRunner with Serializable {
// for spark streaming
@transient val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(3))
import sparkSession.implicits._
@throws[Exception]
override def run(args: ApplicationArguments): Unit = {
// spark code here
}
}
从那里,你可以定义一些@AutoWired
的东西。
spark-submit
bin上运行基于Java的程序,并在Hadoop YARN
上运行。guice
,因此请注意您编译
和运行
的版本。 org.apache.spark:spark-yarn_2.10:1.6.3
| +--- .....
| +--- org.apache.hadoop:hadoop-yarn-server-web-proxy:2.2.0
| | +--- .....
| | +--- com.google.inject:guice:3.0 -> 4.2.2 (*)
Spark 1.6 带来了 Google Guice 3.0。
如果你想要“强制”使用 Google Guice
版本,你必须像这样使用(使用 Gradle):
shadowJar {
relocate 'com.google.inject', 'shadow.com.google.inject'
}
https://imperceptiblethoughts.com/shadow/configuration/relocation/