Akka Persistence Query event stream and CQRS
Asked Answered
D

2

10

I'm trying to implement read side in my ES-CQRS architecture. Let's say I have a persistent actor like this:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String, password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
      .map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

As far as I understand, each time when event gets persisted, we can publish it via Akka Persistence Query. Now, I'm not sure what would be a proper way to subscribe on these events so I can persist it in my read side database? One of the ideas is to initially send a UsersStream message from my read side actor to UserWrite actor and "sink" events in that read actor.

EDIT

Following suggestion of @cmbaxter, I implemented read side this way:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed, events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
      .map {
        case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self, StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long, UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

There are some issues like: Event stream seems to be slow. I.e. UserRead actor can answer with set of users before the newly added user is being saved.

EDIT 2

I increased refresh interval of cassandra query journal which more less solved issue with slow event stream. It appears that Cassandra event journal is by default, being polled each 3 seconds. In my application.conf I added:

cassandra-query-journal {
  refresh-interval = 20ms
}

EDIT 3

Actually, do not decrease refresh interval. That will increase memory usage but that's not dangerous, neither a point. In general concept of CQRS is that write and read side are async. Therefore, after you write data will never be available immediately for reading. Dealing with UI? I just open the stream and push data via server sent events after the read side acknowledges them.

Debris answered 7/7, 2016 at 13:28 Comment(2)
I would just move the read journal based code into your read side projection actor instead of sending it a message with a Source on it. Then process that stream over in that read side projection actor and project that information into Elasticsearch.Unpractical
@Unpractical I did that. It seems to be a very good idea. I updated my question and still accepting suggestions since I still have some doubts.Debris
N
6

There are some ways to do this. For example, in my app i have an actor in my query side that have a PersistenceQuery that is consistently looking for changes, but you can have a thread with the same query too. The thing is to maintain the stream open to be able to read the persisted event as soon as it happens

val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
  CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
  readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)

// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
  case userEvent: UserEvent => {
    doSomething(userEvent)
  }
}

Instead of this, you can have a timer that raises a PersistenceQuery and stores new events, but i think that having a stream open is the best way

Neolithic answered 11/7, 2016 at 10:58 Comment(0)
A
4

Although the solution with PersistenceQuery only was approved, it contains the following problems:

  1. It is partial, there is only method to read EventEnvelopes presented.
  2. It can't work with State snapshots and ,as result, CQRS Reader Part should over all persisted events persisted ever.

The first solution is better, but has the following issues:

  1. It is too complicated. It causes to user unnecessary dealing with sequence numbers.
  2. The code deals with state (query/update) too coupled with the Actors implementation.

There is exists simpler one:

import akka.NotUsed
import akka.actor.{Actor, ActorLogging}
import akka.persistence.query.{EventEnvelope, PersistenceQuery}
import akka.persistence.query.javadsl.{EventsByPersistenceIdQuery, ReadJournal}
import akka.persistence._
import akka.stream.ActorMaterializer
import akka.stream.javadsl.Source

/**
  * Created by alexv on 4/26/2017.
  */
class CQRSTest {

  // User Command, will be transformed to User Event
  sealed trait UserCommand
  // User Event
  // let's assume some conversion from Command to event here
  case class PersistedEvent(command: UserCommand) extends Serializable
  // User State, for simplicity assumed that all State will be snapshotted
  sealed trait State extends Serializable{
    def clear(): Unit
    def updateState(event: PersistedEvent): Unit
    def validateCommand(command:UserCommand): Boolean
    def applyShapshot(newState: State): Unit
    def getShapshot() : State
  }
  case class SaveSnapshot()

  /**
    * Common code for Both reader and writer
    * @param state - State
    */
  abstract class CQRSCore(state: State) extends PersistentActor with ActorLogging {
    override def persistenceId: String = "CQRSPersistenceId"

    override def preStart(): Unit = {
      // Since the state is external and not depends to Actor's failure or restarts it should be cleared.
      state.clear()
    }

    override def receiveRecover: Receive = {
      case event : PersistedEvent => state.updateState(event)
      case SnapshotOffer(_, snapshot: State) => state.applyShapshot(snapshot)
      case RecoveryCompleted  => onRecoveryCompleted(super.lastSequenceNr)
    }

    abstract def onRecoveryCompleted(lastSequenceNr:Long)
  }

  class CQRSWriter(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSWriter Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed")
    }

    override def receiveCommand: Receive = {
      case command: UserCommand =>
        if(state.validateCommand(command)) {
          // Persist events and call state.updateState with each persisted event
          persistAll(List(PersistedEvent(command)))(state.updateState)
        }
        else {
          log.error("Validation Failed for Command: {}", command)
        }
      case SaveSnapshot => saveSnapshot(state.getShapshot())
      case SaveSnapshotSuccess(metadata) => log.debug("Saved snapshot successfully: {}", metadata)
      case SaveSnapshotFailure(metadata, reason) => log.error("Failed to Save snapshot: {} . Reason: {}", metadata, reason)
    }
  }

  class CQRSReader(state: State) extends CQRSCore(state){
    override def preStart(): Unit = {
      super.preStart()
      log.info("CQRSReader Started")
    }

    override  def onRecoveryCompleted(lastSequenceNr: Long): Unit = {
      log.info("Recovery completed, Starting QueryStream")

      // ReadJournal type not specified here, so may be used with Cassandra or In-memory Journal (for Tests)
      val readJournal = PersistenceQuery(context.system).readJournalFor(
        context.system.settings.config.getString("akka.persistence.query.my-read-journal"))
        .asInstanceOf[ReadJournal
        with EventsByPersistenceIdQuery]
      val source: Source[EventEnvelope, NotUsed] = readJournal.eventsByPersistenceId(
        OrgPersistentActor.orgPersistenceId, lastSequenceNr + 1, Long.MaxValue)
      source.runForeach({ envelope => state.updateState(envelope.event.asInstanceOf[PersistedEvent]) },ActorMaterializer())

    }

    // Nothing received since it is Reader only
    override def receiveCommand: Receive = Actor.emptyBehavior
  }
}
Alanson answered 26/4, 2017 at 13:29 Comment(1)
Assumed CQRSRead Part should be queried via its state directly. CQRSReader makes sure, that the state is similar to CQRSWriter's one. I haven't implemented the Concrete state here, but it can be anything from simple Hash Map till In-memory Graph DBAlanson

© 2022 - 2024 — McMap. All rights reserved.