Apache Spark - 是否可以使用依赖注入机制?

7

有没有可能在Spark应用程序中使用框架来启用/使用依赖注入?

例如,是否可以使用Guice?

如果可以的话,是否有文档或示例说明如何实现?

我正在使用Scala作为实现语言,Spark 2.2和SBT作为构建工具。

目前,我的团队和我正在使用Cake Pattern - 然而,它已经变得非常冗长,我们更喜欢Guice。那是一些更直观的东西,其他团队成员已经知道了。

4个回答

0

Neutrino框架正是符合您的需求。

免责声明:我是Neutrino框架的作者。

什么是Neutrino框架

它是一个基于Guice的依赖注入框架,专为Apache Spark设计,旨在减轻开发中的序列化工作。更具体地说,在对象传输和检查点恢复过程中,它将自动处理DI生成的对象的序列化/反序列化工作。

示例:

这里有一个简单的示例(根据Redis数据过滤事件流):

trait EventFilter[T] {
    def filter(t: T): Boolean
}

// The RedisEventFilter class depends on JedisCommands directly,
// and doesn't extend `java.io.Serializable` interface.
class RedisEventFilter @Inject()(jedis: JedisCommands)
extends EventFilter[ClickEvent] {
   override def filter(e: ClickEvent): Boolean = {
       // filter logic based on redis
   }
}

/* create injector */
val injector = ...

val eventFilter = injector.instance[EventFilter[ClickEvent]]
val eventStream: DStream[ClickEvent] = ...
eventStream.filter(e => eventFilter.filter(e))

以下是如何配置绑定:

class FilterModule(redisConfig: RedisConfig) extends SparkModule {
   override def configure(): Unit = {
       // the magic is here
       // The method `withSerializableProxy` will generate a proxy 
       // extending `EventFilter` and `java.io.Serializable` interfaces with Scala macro.
       // The module must extend `SparkModule` or `SparkPrivateModule` to get it
       bind[EventFilter[ClickEvent]].withSerializableProxy
           .to[RedisEventFilter].in[SingletonScope]
   }
}

使用中微子,RedisEventFilter甚至不需要关心序列化问题。一切都像在单个JVM中一样正常工作。

它如何在内部处理序列化问题

我们知道,为了采用DI框架,我们需要首先构建一个依赖图,描述各种类型之间的依赖关系。Guice使用Module API构建图形,而Spring框架使用XML文件或注释。 中微子基于Guice框架构建,并且当然使用guice模块API构建依赖图。它不仅将图形保留在驱动程序中,还在每个执行器上运行相同的图形。

serialize creation method

在依赖图中,一些节点可能会生成对象,这些对象可以传递给执行器,neutrino框架会为这些节点分配唯一的ID。由于每个JVM都有相同的图形,因此每个JVM上的图形具有相同的节点ID集。
在上面的示例中,neutrino生成了一个代理类,该类扩展了EventFilter。代理实例保存了图中的节点ID,该ID将被传递给执行器,以便在图中查找节点并相应地重新创建实例及其所有依赖项。

其他特性

作用域

由于每个执行器上都有一个图形,因此可以使用neutrino控制执行器上对象的生命周期/作用域,而这对于经典的DI方法来说是不可能的。 neutrino还提供了一些实用范围,例如每个JVM的单例,StreamingBatch范围等。

关键对象注入

一些关键的spark对象,如SparkContext、StreamingContext也是可注入的。
有关详细信息,请参阅neutrino自述文件

0

1
这个库是来自Databricks的吗? - Filipe Miranda

0

Spring Boot 提供了与各种系统的集成,包括 Spark、Hadoop、YARN、Kafka 和 JDBC 数据库。

例如,我有这个 application.properties

spring.main.web-environment=false

appName=spring-spark
sparkHome=/Users/username/Applications/spark-2.2.1-bin-hadoop2.7
masterUri=local

这是一个应用程序类

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.env.Environment;

@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfig {
    @Autowired
    private Environment env;

    @Value("${appName:Spark Example}")
    private String appName;

    @Value("${sparkHome}")
    private String sparkHome;

    @Value("${masterUri:local}")
    private String masterUri;


    @Bean
    public SparkConf sparkConf() {
        return new SparkConf()
                .setAppName(appName)
                .setSparkHome(sparkHome)
                .setMaster(masterUri);
    }

    @Bean
    public JavaSparkContext javaSparkContext() {
        return new JavaSparkContext(sparkConf());
    }

    @Bean
    public SparkSession sparkSession() {
        SparkSession.Builder sparkBuilder = SparkSession.builder()
                .appName(appName)
                .master(masterUri)
                .sparkContext(javaSparkContext().sc());

        return sparkBuilder.getOrCreate();
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

taskContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


    <!--List out all tasks here-->

    <bean id="exampleSparkTask" class="com.example.spark.task.SampleSparkTask">
        <constructor-arg ref="sparkSession" />
    </bean>

</beans>

App

@SpringBootApplication
@ImportResource("classpath:taskContext.xml")
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);

    }
}

这里实际上是运行Spark的Scala代码

@Order(1)
class SampleSparkTask(sparkSession: SparkSession) extends ApplicationRunner with Serializable {

  // for spark streaming
  @transient val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(3))

  import sparkSession.implicits._

  @throws[Exception]
  override def run(args: ApplicationArguments): Unit = {
    // spark code here
  }
}

从那里,你可以定义一些@AutoWired的东西。


我在思考Guice...相比Guice,Spring有哪些优势? - Filipe Miranda
正如其他答案中提到的那样,Guice实际上并不是开箱即用的。该链接来自Salesforce,他们扩展了Guice,因为注入的对象无法序列化。我知道我们公司有一些团队正在使用Spark + SpringBoot,所以我可以假设它对这些事情的处理更加适当。 - OneCricketeer

0
当然可以!在Qwant.com中,我们使用Spark 1.6和Google Guice 4,在spark-submit bin上运行基于Java的程序,并在Hadoop YARN上运行。
如果您在Hadoop上运行Spark(通过HDP组件jar包),则已经存在guice,因此请注意您编译运行的版本。
 org.apache.spark:spark-yarn_2.10:1.6.3
|    +--- .....
|    +--- org.apache.hadoop:hadoop-yarn-server-web-proxy:2.2.0
|    |    +--- .....
|    |    +--- com.google.inject:guice:3.0 -> 4.2.2 (*)

Spark 1.6 带来了 Google Guice 3.0。

如果你想要“强制”使用 Google Guice 版本,你必须像这样使用(使用 Gradle):

shadowJar {
    relocate 'com.google.inject', 'shadow.com.google.inject'
}

https://imperceptiblethoughts.com/shadow/configuration/relocation/


嗨@ThomasDecaux,能否提供一个完整的与Guice集成的示例将会很不错。 - Filipe Miranda
无法完全提供(/因为企业私有), 但实际上并没有陷阱(除了序列化问题,如果您在工作器内使用Guice注入器,这是我极力反对的),只需确保您的代码与您编译的正确版本一起运行即可。 - Thomas Decaux

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接