EventStream
With every actor system, there is a default event bus called EventStream. It can be used to provide default pub-sub functionality within that actor system (if you want custom pub-sub, you can implement your own event bus). EventStream is a sub-channel type event bus, which works with class hierarchies and you can subscribe to a group of event by subscribing to their common superclass.
abstract class AllKindsOfMusic { def artist: String }
case class Jazz(artist: String) extends AllKindsOfMusic
case class Electronic(artist: String) extends AllKindsOfMusic
class Listener extends Actor {
def receive = {
case m: Jazz => println(s"${self.path.name} is listening to: ${m.artist}")
case m: Electronic => println(s"${self.path.name} is listening to: ${m.artist}")
}
}
val jazzListener = system.actorOf(Props[Listener])
val musicListener = system.actorOf(Props[Listener])
system.eventStream.subscribe(jazzListener, classOf[Jazz])
system.eventStream.subscribe(musicListener, classOf[AllKindsOfMusic])
// only musicListener gets this message, since it listens to *all* kinds of music:
system.eventStream.publish(Electronic("Parov Stelar"))
// jazzListener and musicListener will be notified about Jazz:
system.eventStream.publish(Jazz("Sonny Rollins"))
Another example:
//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)
//Actor that performs account updates
class AccountManager extends Actor{
val dao = new AccountManagerDao
def receive = {
case UpdateAccountBalance(userId, amount) =>
val res = for(result <- dao.updateBalance(userId, amount)) yield{
context.system.eventStream.publish(BalanceUpdated(userId))
result
}
sender ! res
}
}
//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
val cache = new AccountCache
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
cache.remove(userId)
}
}
//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
val dao = new LowBalanceDao
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
for{
balance <- dao.getBalance(userId)
theshold <- dao.getBalanceThreshold(userId)
if (balance < threshold)
}{
sendBalanceEmail(userId, balance)
}
}
}