Lagom service consuming input from Kafka
Asked Answered
J

2

7

I am trying to figure out how Lagom can be used to consume data from external systems communicating over Kafka.

I've ran into this section of Lagom documentation, which describes how Lagom service can communicate with another Lagom service by subscribing to its topic.

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
  Flow.fromFunction(doSomethingWithTheMessage)
)

However, what is the appropriate configuration when you want to subscribe to a Kafka topic that contains events produced by some random, external system?

Is some sort of adapter needed for this functionality? To clarify, I have this at the moment:

object Aggregator {
  val TOPIC_NAME = "my-aggregation"
}

trait Aggregator extends Service {
  def aggregate(correlationId: String): ServiceCall[Data, Done]

  def aggregationTopic(): Topic[DataRecorded]

  override final def descriptor: Descriptor = {
    import Service._

    named("aggregator")
      .withCalls(
        pathCall("/api/aggregate/:correlationId", aggregate _)
      )
      .withTopics(
        topic(Aggregator.TOPIC_NAME, aggregationTopic())
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

And I can invoke it via simple POST request. However, I would like for it to be invoked by consuming Data messages from some (external) Kafka topic.

I was wondering if there is such a way to configure the descriptor in a fashion similar to this mockup:

override final def descriptor: Descriptor = {
  ...
  kafkaTopic("my-input-topic")
    .subscribe(serviceCall(aggregate _)
    .withAtMostOnceDelivery
}

I've ran into this discussion on Google Groups, but in the OPs questions, I do not see he is actually doing anything with EventMessages coming from some-topic except routing them to the topic defined by his service.

EDIT #1: Progress update

Looking at the documentation, I decided to try the following approach. I added 2 more modules, aggregator-kafka-proxy-api and aggregator-kafka-proxy-impl.

In new api module, I defined a new service, with no methods, but one topic which would represent my Kafka topic:

object DataKafkaPublisher {
  val TOPIC_NAME = "data-in"
}

trait DataKafkaPublisher extends Service {
  def dataInTopic: Topic[DataPublished]

  override final def descriptor: Descriptor = {
    import Service._
    import DataKafkaPublisher._

    named("data-kafka-in")
      .withTopics(
        topic(TOPIC_NAME, dataInTopic)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

In the impl module, I simply did the standard implementation

class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
  override def dataInTopic: Topic[api.DataPublished] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
    evt.event match {
      case DataPublished(data) => api.DataPublished(data)
    }
  }
}

Now, to actually consume these events, in my aggregator-impl module, I added a "subscriber" service, which takes these events, and invokes appropriate commands on entity.

class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {

  kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
    Flow[DataPublished].mapAsync(1) { sd =>
      sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
    }
  )

  private def sessionRef(correlationId: String) =
    persistentEntityRegistry.refFor[Entity](correlationId)
}

This effectively allowed me to publish a message on Kafka topic "data-in", which was then proxied and converted to RecordData command before issued to the entity to consume.

However, it seems somewhat hacky to me. I am coupled to Kafka by Lagom internals. I cannot swap the source of my data easily. For example, how would I consume external messages from RabbitMQ if I wanted to? What if I'm trying to consume from another Kafka (different one than used by Lagom)?

Edit #2: More docs

I've found a few articles on Lagom docs, notably, this:

Consuming Topics from 3rd parties

You may want your Lagom service to consume data produced on services not implemented in Lagom. In that case, as described in the Service Clients section, you can create a third-party-service-api module in your Lagom project. That module will contain a Service Descriptor declaring the topic you will consume from. Once you have your ThirdPartyService interface and related classes implemented, you should add third-party-service-api as a dependency on your fancy-service-impl. Finally, you can consume from the topic described in ThirdPartyService as documented in the Subscribe to a topic section.

Jumna answered 4/2, 2019 at 10:39 Comment(0)
J
0

An answer was provided by Alan Klikic on Lightbend discussion forums here.

Part 1:

If you are only using external Kafka cluster in your business service then you can implement this using only Lagom Broker API. So you need to:

  1. create API with service descriptor with only topic definition (this API is not beeing implemented)
  2. in your business service configure kafka_native depending on your deployment (as i mentioned in previous post)
  3. in your business service inject service from API created in #1 and subscribe to it using Lagom Broker API subscriber

Offset commiting, in Lagom Broker API subscriber is handled out-of-the-box.

Part 2:

Kafka and AMQP consumer implementations require persistant akka stream. So you need to handle disconnects. These can be done in two ways:

  1. control peristant akka stream by wraping it in an actor. You initialize you stream Flow on actor preStart and pipe stream complete to the actor that will stop it. If stream completes or fails actor will stop. Then wrap actor in actor backoff with restart strategy, that will restart the actor in case of complete or fail and reinitialize the Flow
  2. akka streams Delayed restarts with backoff stage

Personnaly I use #1 and did not try #2 yet.

Initializing backoff actor for #1 or Flow for #2 can be done in your Lagom components trait (basically in the same place where you do your subscribe now using Lagom Broker API).

Be sure to set a consumer group when configuring consumer to ensure avoiding duplicate consuming. You can use, like Lagom does, service name from descriptor as consumer group name.

Jumna answered 19/2, 2019 at 9:56 Comment(0)
H
1

I don't use lagom so this is maybe just an idea. But as akka-streams is part of lagom (at least I assume that) - to get from this solution to what you need should be easy.

I used akka-stream-kafka and this went really nice (I only did a Prototype)

As you consume messages you would do something:

     Consumer
      .committableSource(
          consumerSettings(..), // config of Kafka
          Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe
      .mapAsync(10) { msg =>
        business(msg.record) // do something
      }

Check the well written documentation

My whole example you find here: PathMsgConsumer

Heraclitus answered 6/2, 2019 at 19:9 Comment(1)
I am aware that akka-streams comes with Lagom, and it has occurred to me I might use it on in/out boundaries, but I want to know if there is a recommended, idiomatic way for this. Nevertheless, I'll try your approach!Jumna
J
0

An answer was provided by Alan Klikic on Lightbend discussion forums here.

Part 1:

If you are only using external Kafka cluster in your business service then you can implement this using only Lagom Broker API. So you need to:

  1. create API with service descriptor with only topic definition (this API is not beeing implemented)
  2. in your business service configure kafka_native depending on your deployment (as i mentioned in previous post)
  3. in your business service inject service from API created in #1 and subscribe to it using Lagom Broker API subscriber

Offset commiting, in Lagom Broker API subscriber is handled out-of-the-box.

Part 2:

Kafka and AMQP consumer implementations require persistant akka stream. So you need to handle disconnects. These can be done in two ways:

  1. control peristant akka stream by wraping it in an actor. You initialize you stream Flow on actor preStart and pipe stream complete to the actor that will stop it. If stream completes or fails actor will stop. Then wrap actor in actor backoff with restart strategy, that will restart the actor in case of complete or fail and reinitialize the Flow
  2. akka streams Delayed restarts with backoff stage

Personnaly I use #1 and did not try #2 yet.

Initializing backoff actor for #1 or Flow for #2 can be done in your Lagom components trait (basically in the same place where you do your subscribe now using Lagom Broker API).

Be sure to set a consumer group when configuring consumer to ensure avoiding duplicate consuming. You can use, like Lagom does, service name from descriptor as consumer group name.

Jumna answered 19/2, 2019 at 9:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.