The neutrino framework is exactly for your requirement.
Disclaimer: I am the author of the neutrino framework.
What is the neutrino framework
It is a Guice-based dependency injection framework for apache spark and is designed to relieve the serialization work of development. More specifically, it will handle the serialization/deserialization work for the DI-generated objects automatically during the process of object transmission and checkpoint recovery.
Example:
Here is a simple example (just filter a event stream based on redis data):
trait EventFilter[T] {
def filter(t: T): Boolean
}
// The RedisEventFilter class depends on JedisCommands directly,
// and doesn't extend `java.io.Serializable` interface.
class RedisEventFilter @Inject()(jedis: JedisCommands)
extends EventFilter[ClickEvent] {
override def filter(e: ClickEvent): Boolean = {
// filter logic based on redis
}
}
/* create injector */
val injector = ...
val eventFilter = injector.instance[EventFilter[ClickEvent]]
val eventStream: DStream[ClickEvent] = ...
eventStream.filter(e => eventFilter.filter(e))
Here is how to config the bindings:
class FilterModule(redisConfig: RedisConfig) extends SparkModule {
override def configure(): Unit = {
// the magic is here
// The method `withSerializableProxy` will generate a proxy
// extending `EventFilter` and `java.io.Serializable` interfaces with Scala macro.
// The module must extend `SparkModule` or `SparkPrivateModule` to get it
bind[EventFilter[ClickEvent]].withSerializableProxy
.to[RedisEventFilter].in[SingletonScope]
}
}
With neutrino, the RedisEventFilter
doesn't even care about serialization problem. Every thing just works like in a single JVM.
How does it handle the serialization problem internally
As we know, to adopt the DI framework, we need to first build a dependency graph first, which describes the dependency relationship between various types. Guice uses Module API to build the graph while the Spring framework uses XML files or annotations.
The neutrino is built based on Guice framework, and of course, builds the dependency graph with the guice module API. It doesn't only keep the graph in the driver, but also has the same graph running on every executor.
In the dependency graph, some nodes may generate objects which may be passed to the executors, and neutrino framework would assign unique ids to these nodes. As every JVM have the same graph, the graph on each JVM have the same node id set.
In the example above, the neutrino generates a proxy class which extends the EventFilter
. The proxy instance holds the node id in the graph, which will be passed to the executors to find the node in the graph and recreate the instance and all its dependencies accordingly.
Other features
Scopes
Since we have a graph on every executor, the object lifetime/scope on executors can be controlled with neutrino, which is impossible for classic DI method.
The neutrino also provides some utility scope, such as singleton per JVM, StreamingBatch scope.
Key object injection
Some key spark objects such as SparkContext, StreamingContext are also injectable.
For details, please refer to the neutrino readme file.