I am trying to figure out how Lagom can be used to consume data from external systems communicating over Kafka.
I've ran into this section of Lagom documentation, which describes how Lagom service can communicate with another Lagom service by subscribing to its topic.
helloService
.greetingsTopic()
.subscribe // <-- you get back a Subscriber instance
.atLeastOnce(
Flow.fromFunction(doSomethingWithTheMessage)
)
However, what is the appropriate configuration when you want to subscribe to a Kafka topic that contains events produced by some random, external system?
Is some sort of adapter needed for this functionality? To clarify, I have this at the moment:
object Aggregator {
val TOPIC_NAME = "my-aggregation"
}
trait Aggregator extends Service {
def aggregate(correlationId: String): ServiceCall[Data, Done]
def aggregationTopic(): Topic[DataRecorded]
override final def descriptor: Descriptor = {
import Service._
named("aggregator")
.withCalls(
pathCall("/api/aggregate/:correlationId", aggregate _)
)
.withTopics(
topic(Aggregator.TOPIC_NAME, aggregationTopic())
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
)
)
.withAutoAcl(true)
}
}
And I can invoke it via simple POST request.
However, I would like for it to be invoked by consuming Data
messages from some (external) Kafka topic.
I was wondering if there is such a way to configure the descriptor in a fashion similar to this mockup:
override final def descriptor: Descriptor = {
...
kafkaTopic("my-input-topic")
.subscribe(serviceCall(aggregate _)
.withAtMostOnceDelivery
}
I've ran into this discussion on Google Groups, but in the OPs questions, I do not see he is actually doing anything with EventMessage
s coming from some-topic
except routing them to the topic defined by his service.
EDIT #1: Progress update
Looking at the documentation, I decided to try the following approach.
I added 2 more modules, aggregator-kafka-proxy-api
and aggregator-kafka-proxy-impl
.
In new api module, I defined a new service, with no methods, but one topic which would represent my Kafka topic:
object DataKafkaPublisher {
val TOPIC_NAME = "data-in"
}
trait DataKafkaPublisher extends Service {
def dataInTopic: Topic[DataPublished]
override final def descriptor: Descriptor = {
import Service._
import DataKafkaPublisher._
named("data-kafka-in")
.withTopics(
topic(TOPIC_NAME, dataInTopic)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
)
)
.withAutoAcl(true)
}
}
In the impl module, I simply did the standard implementation
class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
override def dataInTopic: Topic[api.DataPublished] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
evt.event match {
case DataPublished(data) => api.DataPublished(data)
}
}
}
Now, to actually consume these events, in my aggregator-impl
module, I added a "subscriber" service, which takes these events, and invokes appropriate commands on entity.
class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {
kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
Flow[DataPublished].mapAsync(1) { sd =>
sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
}
)
private def sessionRef(correlationId: String) =
persistentEntityRegistry.refFor[Entity](correlationId)
}
This effectively allowed me to publish a message on Kafka topic "data-in", which was then proxied and converted to RecordData
command before issued to the entity to consume.
However, it seems somewhat hacky to me. I am coupled to Kafka by Lagom internals. I cannot swap the source of my data easily. For example, how would I consume external messages from RabbitMQ if I wanted to? What if I'm trying to consume from another Kafka (different one than used by Lagom)?
Edit #2: More docs
I've found a few articles on Lagom docs, notably, this:
Consuming Topics from 3rd parties
You may want your Lagom service to consume data produced on services not implemented in Lagom. In that case, as described in the Service Clients section, you can create a third-party-service-api module in your Lagom project. That module will contain a Service Descriptor declaring the topic you will consume from. Once you have your ThirdPartyService interface and related classes implemented, you should add third-party-service-api as a dependency on your fancy-service-impl. Finally, you can consume from the topic described in ThirdPartyService as documented in the Subscribe to a topic section.
akka-streams
comes with Lagom, and it has occurred to me I might use it on in/out boundaries, but I want to know if there is a recommended, idiomatic way for this. Nevertheless, I'll try your approach! – Jumna