I'm trying to implement read side in my ES-CQRS architecture. Let's say I have a persistent actor like this:
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
As far as I understand, each time when event gets persisted, we can publish it via Akka Persistence Query
. Now, I'm not sure what would be a proper way to subscribe on these events so I can persist it in my read side database? One of the ideas is to initially send a UsersStream
message from my read side actor to UserWrite
actor and "sink" events in that read actor.
EDIT
Following suggestion of @cmbaxter, I implemented read side this way:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
There are some issues like: Event stream seems to be slow. I.e. UserRead
actor can answer with set of users before the newly added user is being saved.
EDIT 2
I increased refresh interval of cassandra query journal which more less solved issue with slow event stream. It appears that Cassandra event journal is by default, being polled each 3 seconds. In my application.conf
I added:
cassandra-query-journal {
refresh-interval = 20ms
}
EDIT 3
Actually, do not decrease refresh interval. That will increase memory usage but that's not dangerous, neither a point. In general concept of CQRS is that write and read side are async. Therefore, after you write data will never be available immediately for reading. Dealing with UI? I just open the stream and push data via server sent events after the read side acknowledges them.
Source
on it. Then process that stream over in that read side projection actor and project that information into Elasticsearch. – Unpractical