Apache Spark - Is it possible to use a Dependency Injection Mechanism
Asked Answered
T

4

7

Is there any possibility using a framework for enabling / using Dependency Injection in a Spark Application?

Is it possible to use Guice, for instance?

If so, is there any documentation, or samples of how to do it?

I am using Scala as the implementation language, Spark 2.2, and SBT as the build tool.

At the moment, my team adn I are using the Cake Pattern - it has however become quite verbose, and we would prefer Guice. That's something more intuitive, and already know for by other team members.

Tonsillotomy answered 19/11, 2017 at 20:1 Comment(0)
F
0

I've been struggling with the same problem recently. Most of my findings are that you'll face issues with serialization.

I found a nice solution with Guice here: https://www.slideshare.net/databricks/dependency-injection-in-apache-spark-applications

Featherbedding answered 14/12, 2017 at 9:27 Comment(1)
Where is this library from databricks?Tonsillotomy
O
0

Spring Boot offers integration with various systems including Spark, Hadoop, YARN, Kafka, JDBC databases.

For example, I have this application.properties

spring.main.web-environment=false

appName=spring-spark
sparkHome=/Users/username/Applications/spark-2.2.1-bin-hadoop2.7
masterUri=local

This as an Application class

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.env.Environment;

@Configuration
@PropertySource("classpath:application.properties")
public class ApplicationConfig {
    @Autowired
    private Environment env;

    @Value("${appName:Spark Example}")
    private String appName;

    @Value("${sparkHome}")
    private String sparkHome;

    @Value("${masterUri:local}")
    private String masterUri;


    @Bean
    public SparkConf sparkConf() {
        return new SparkConf()
                .setAppName(appName)
                .setSparkHome(sparkHome)
                .setMaster(masterUri);
    }

    @Bean
    public JavaSparkContext javaSparkContext() {
        return new JavaSparkContext(sparkConf());
    }

    @Bean
    public SparkSession sparkSession() {
        SparkSession.Builder sparkBuilder = SparkSession.builder()
                .appName(appName)
                .master(masterUri)
                .sparkContext(javaSparkContext().sc());

        return sparkBuilder.getOrCreate();
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

taskContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


    <!--List out all tasks here-->

    <bean id="exampleSparkTask" class="com.example.spark.task.SampleSparkTask">
        <constructor-arg ref="sparkSession" />
    </bean>

</beans>

App

@SpringBootApplication
@ImportResource("classpath:taskContext.xml")
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);

    }
}

And actually running Scala code here for Spark

@Order(1)
class SampleSparkTask(sparkSession: SparkSession) extends ApplicationRunner with Serializable {

  // for spark streaming
  @transient val ssc = new StreamingContext(sparkSession.sparkContext, Seconds(3))

  import sparkSession.implicits._

  @throws[Exception]
  override def run(args: ApplicationArguments): Unit = {
    // spark code here
  }
}

From there, you can define some @AutoWired things.

Obe answered 14/3, 2018 at 4:47 Comment(2)
I was thinking about Guice... any avvantages from Spring over guice?Tonsillotomy
As linked in the other answer, Guice doesn't actually work out of the box. The link is from Salesforce that they extended Guice because the injected objects cannot be serialized. I know of a few teams at my company that are using Spark + SpringBoot, so I can assume that it works more appropriately for such things.Obe
B
0

Of course you can! At Qwant.com we use Spark 1.6 with Google Guice 4, run java programs on Hadoop YARN with spark-submit bin.

guice is already here if you run Spark on Hadoop (via the HDP assembly jar), so pay attention at the version you compile and you really run.

 org.apache.spark:spark-yarn_2.10:1.6.3
|    +--- .....
|    +--- org.apache.hadoop:hadoop-yarn-server-web-proxy:2.2.0
|    |    +--- .....
|    |    +--- com.google.inject:guice:3.0 -> 4.2.2 (*)

Spark 1.6 brings Google Guice 3.0.

If you want to "force" the version of Google Guice, you must use something like this (with Gradle):

shadowJar {
    relocate 'com.google.inject', 'shadow.com.google.inject'
}

https://imperceptiblethoughts.com/shadow/configuration/relocation/

Billowy answered 4/3, 2019 at 12:27 Comment(2)
Hi @ThomasDecaux - it would be nice to see a full example of this integration with Guice.Tonsillotomy
full I cant :/ (because entreprise private) but there is no trap really (apart the serialization issue, if you use Guice injector inside a worker, what I really discourage), just make sure your code is running with the right version you compile with.Billowy
C
0

The neutrino framework is exactly for your requirement.

Disclaimer: I am the author of the neutrino framework.

What is the neutrino framework

It is a Guice-based dependency injection framework for apache spark and is designed to relieve the serialization work of development. More specifically, it will handle the serialization/deserialization work for the DI-generated objects automatically during the process of object transmission and checkpoint recovery.

Example:

Here is a simple example (just filter a event stream based on redis data):

trait EventFilter[T] {
    def filter(t: T): Boolean
}

// The RedisEventFilter class depends on JedisCommands directly,
// and doesn't extend `java.io.Serializable` interface.
class RedisEventFilter @Inject()(jedis: JedisCommands)
extends EventFilter[ClickEvent] {
   override def filter(e: ClickEvent): Boolean = {
       // filter logic based on redis
   }
}

/* create injector */
val injector = ...

val eventFilter = injector.instance[EventFilter[ClickEvent]]
val eventStream: DStream[ClickEvent] = ...
eventStream.filter(e => eventFilter.filter(e))

Here is how to config the bindings:

class FilterModule(redisConfig: RedisConfig) extends SparkModule {
   override def configure(): Unit = {
       // the magic is here
       // The method `withSerializableProxy` will generate a proxy 
       // extending `EventFilter` and `java.io.Serializable` interfaces with Scala macro.
       // The module must extend `SparkModule` or `SparkPrivateModule` to get it
       bind[EventFilter[ClickEvent]].withSerializableProxy
           .to[RedisEventFilter].in[SingletonScope]
   }
}

With neutrino, the RedisEventFilter doesn't even care about serialization problem. Every thing just works like in a single JVM.

How does it handle the serialization problem internally

As we know, to adopt the DI framework, we need to first build a dependency graph first, which describes the dependency relationship between various types. Guice uses Module API to build the graph while the Spring framework uses XML files or annotations. The neutrino is built based on Guice framework, and of course, builds the dependency graph with the guice module API. It doesn't only keep the graph in the driver, but also has the same graph running on every executor.

serialize creation method

In the dependency graph, some nodes may generate objects which may be passed to the executors, and neutrino framework would assign unique ids to these nodes. As every JVM have the same graph, the graph on each JVM have the same node id set. In the example above, the neutrino generates a proxy class which extends the EventFilter. The proxy instance holds the node id in the graph, which will be passed to the executors to find the node in the graph and recreate the instance and all its dependencies accordingly.

Other features

Scopes

Since we have a graph on every executor, the object lifetime/scope on executors can be controlled with neutrino, which is impossible for classic DI method. The neutrino also provides some utility scope, such as singleton per JVM, StreamingBatch scope.

Key object injection

Some key spark objects such as SparkContext, StreamingContext are also injectable.

For details, please refer to the neutrino readme file.

Commix answered 5/5, 2022 at 2:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.