EventStream

With every actor system, there is a default event bus called EventStream. It can be used to provide default pub-sub functionality within that actor system (if you want custom pub-sub, you can implement your own event bus). EventStream is a sub-channel type event bus, which works with class hierarchies and you can subscribe to a group of event by subscribing to their common superclass.

abstract class AllKindsOfMusic { def artist: String }
case class Jazz(artist: String) extends AllKindsOfMusic
case class Electronic(artist: String) extends AllKindsOfMusic

class Listener extends Actor {
  def receive = {
    case m: Jazz       => println(s"${self.path.name} is listening to: ${m.artist}")
    case m: Electronic => println(s"${self.path.name} is listening to: ${m.artist}")
  }
}

val jazzListener = system.actorOf(Props[Listener])
val musicListener = system.actorOf(Props[Listener])
system.eventStream.subscribe(jazzListener, classOf[Jazz])
system.eventStream.subscribe(musicListener, classOf[AllKindsOfMusic])

// only musicListener gets this message, since it listens to *all* kinds of music:
system.eventStream.publish(Electronic("Parov Stelar"))

// jazzListener and musicListener will be notified about Jazz:
system.eventStream.publish(Jazz("Sonny Rollins"))

Another example:

//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)

//Actor that performs account updates
class AccountManager extends Actor{
  val dao = new AccountManagerDao

  def receive = {
    case UpdateAccountBalance(userId, amount) =>
      val res = for(result <- dao.updateBalance(userId, amount)) yield{
        context.system.eventStream.publish(BalanceUpdated(userId))
        result                
      }

      sender ! res
  }
}

//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
  val cache = new AccountCache

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      cache.remove(userId)
  }
}

//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
  val dao = new LowBalanceDao

  override def preStart = {
    context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
  }

  def receive = {
    case BalanceUpdated(userId) =>
      for{
        balance <- dao.getBalance(userId)
        theshold <- dao.getBalanceThreshold(userId)
        if (balance < threshold)
      }{
        sendBalanceEmail(userId, balance)
      }
  }
}

results matching ""

    No results matching ""