Custom Event Bus

For custom event bus, you will need to implement EventBus and one of the classification traits. EventBus trait is defined as follows:

/**
 * Represents the base type for EventBuses
 * Internally has an Event type, a Classifier type and a Subscriber type
 *
 */
trait EventBus {
  type Event
  type Classifier
  type Subscriber

  //#event-bus-api
  /**
   * Attempts to register the subscriber to the specified Classifier
   * @return true if successful and false if not (because it was already
   *   subscribed to that Classifier, or otherwise)
   */
  def subscribe(subscriber: Subscriber, to: Classifier): Boolean

  /**
   * Attempts to deregister the subscriber from the specified Classifier
   * @return true if successful and false if not (because it wasn't subscribed
   *   to that Classifier, or otherwise)
   */
  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean

  /**
   * Attempts to deregister the subscriber from all Classifiers it may be subscribed to
   */
  def unsubscribe(subscriber: Subscriber): Unit

  /**
   * Publishes the specified Event to this bus
   */
  def publish(event: Event): Unit
  //#event-bus-api
}

The implementation of event bus must define the following abstract types:

  • Event: Type of all the events published on this bus
  • Subscriber: Type of the subscribers allowed to register on this bus
  • Classifier: Classifier to be used in selecting subscribers for dispatching events

Also we need to choose a classification type i.e. lookup, subchannel or scanning (or you can implement a custom one).

LookupClassification

LookupClassification trait is defined as follows:

/**
 * Maps Subscribers to Classifiers using equality on Classifier to store a Set of Subscribers (hence the need for compareSubscribers)
 * Maps Events to Classifiers through the classify-method (so it knows who to publish to)
 *
 * The compareSubscribers need to provide a total ordering of the Subscribers
 */
trait LookupClassification { this: EventBusprotected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
    def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
  })

  /**
   * This is a size hint for the number of Classifiers you expect to have (use powers of 2)
   */
  protected def mapSize(): Int

  /**
   * Provides a total ordering of Subscribers (think java.util.Comparator.compare)
   */
  protected def compareSubscribers(a: Subscriber, b: Subscriber): Int

  /**
   * Returns the Classifier associated with the given Event
   */
  protected def classify(event: Event): Classifier

  /**
   * Publishes the given Event to the given Subscriber
   */
  protected def publish(event: Event, subscriber: Subscriber): Unit

  def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscribers.put(to, subscriber)

  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscribers.remove(from, subscriber)

  def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(subscriber)

  def publish(event: Event): Unit = {
    val i = subscribers.valueIterator(classify(event))
    while (i.hasNext) publish(event, i.next())
  }
}

LookupClassification trait keeps a list of subscribers (in sorted order using ConcurrentSkipListSet). We need to provide a size hint for number of subscribers using mapSize and ordering between subscribers using compareSubscribers. When an event is published, it's classifier is extracted using classify (we need to provide it's implementation) and that event is published (we need to provide it's implementation) to all the subscribers matching that classifier.

So when using LookupClassification we need to provide the implementation of the following:

  • mapSize: Size hint for number of subscribers
  • compareSubscribers: Method to compare two subscribers
  • classify: Method to extract classifier from event/message
  • publish: Implementation of publish (to a single subscriber, will be called for each matching subscriber)

Example:

import akka.event.EventBus
import akka.event.LookupClassification

final case class MsgEnvelope(topic: String, payload: Any)

/**
 * Publishes the payload of the MsgEnvelope when the topic of the
 * MsgEnvelope equals the String specified when subscribing.
 */
class LookupBusImpl extends EventBus with LookupClassification {
  type Event = MsgEnvelope
  type Classifier = String
  type Subscriber = ActorRef

  // is used for extracting the classifier from the incoming events  
  override protected def classify(event: Event): Classifier = event.topic

  // will be invoked for each event for all subscribers which registered themselves
  // for the event’s classifier
  override protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.payload
  }

  // must define a full order over the subscribers, expressed as expected from
  // `java.lang.Comparable.compare`
  override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
    a.compareTo(b)

  // determines the initial size of the index data structure
  // used internally (i.e. the expected number of different classifiers)
  override protected def mapSize: Int = 128
}

Test:

val lookupBus = new LookupBusImpl
lookupBus.subscribe(testActor, "greetings")
lookupBus.publish(MsgEnvelope("time", System.currentTimeMillis()))
lookupBus.publish(MsgEnvelope("greetings", "hello"))
expectMsg("hello")

SubchannelClassification

SubchannelClassification trait is defined as:

/**
 * Classification which respects relationships between channels: subscribing
 * to one channel automatically and idempotently subscribes to all sub-channels.
 */
trait SubchannelClassification { this: EventBus/**
   * The logic to form sub-class hierarchy
   */
  protected implicit def subclassification: Subclassification[Classifier]

  // must be lazy to avoid initialization order problem with subclassification
  private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()

  @volatile
  private var cache = Map.empty[Classifier, Set[Subscriber]]

  /**
   * Returns the Classifier associated with the given Event
   */
  protected def classify(event: Event): Classifier

  /**
   * Publishes the given Event to the given Subscriber
   */
  protected def publish(event: Event, subscriber: Subscriber): Unit

  def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized {
    val diff = subscriptions.addValue(to, subscriber)
    addToCache(diff)
    diff.nonEmpty
  }

  def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = subscriptions.synchronized {
    val diff = subscriptions.removeValue(from, subscriber)
    // removeValue(K, V) does not return the diff to remove from or add to the cache
    // but instead the whole set of keys and values that should be updated in the cache
    cache ++= diff
    diff.nonEmpty
  }

  def unsubscribe(subscriber: Subscriber): Unit = subscriptions.synchronized {
    removeFromCache(subscriptions.removeValue(subscriber))
  }

  def publish(event: Event): Unit = {
    val c = classify(event)
    val recv =
      if (cache contains c) cache(c) // c will never be removed from cache
      else subscriptions.synchronized {
        if (cache contains c) cache(c)
        else {
          addToCache(subscriptions.addKey(c))
          cache(c)
        }
      }
    recv foreach (publish(event, _))
  }

In order to use subchannel classification, we need to implement SubchannelClassificationand provide the implementation of Subclassification[V] class, which defines the hierarchy among instances of V e.g. whether an instance V1 of V is a subclass of instance V2.

For example, if V is String and V2 is subclass of V1 is V2 starts with V1 e.g. abcdef is subclass of abc:

import akka.util.Subclassification

class StartsWithSubclassification extends Subclassification[String] {
  override def isEqual(x: String, y: String): Boolean =
    x == y

  override def isSubclass(x: String, y: String): Boolean =
    x.startsWith(y)
}

We can implement an event us with subchannel classification as follows:

import akka.event.SubchannelClassification

/**
 * Publishes the payload of the MsgEnvelope when the topic of the
 * MsgEnvelope starts with the String specified when subscribing.
 */
class SubchannelBusImpl extends EventBus with SubchannelClassification {
  type Event = MsgEnvelope
  type Classifier = String
  type Subscriber = ActorRef

  // Subclassification is an object providing `isEqual` and `isSubclass`
  // to be consumed by the other methods of this classifier
  override protected val subclassification: Subclassification[Classifier] =
    new StartsWithSubclassification

  // is used for extracting the classifier from the incoming events  
  override protected def classify(event: Event): Classifier = event.topic

  // will be invoked for each event for all subscribers which registered
  // themselves for the event’s classifier
  override protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event.payload
  }
}

Test:

val subchannelBus = new SubchannelBusImpl
subchannelBus.subscribe(testActor, "abc")
subchannelBus.publish(MsgEnvelope("xyzabc", "x"))
subchannelBus.publish(MsgEnvelope("bcdef", "b"))
subchannelBus.publish(MsgEnvelope("abc", "c"))
expectMsg("c")
subchannelBus.publish(MsgEnvelope("abcdef", "d"))
expectMsg("d")

ScanningClassification

For scanning classification, we need to implement ScanningClassification trait and provide the implementation of the following methods:

  • compareClassifiers: Implementation to compare two classifiers
  • compareSubscribers: Implementation to compare two subscribers
  • matches: Method to check whether event matches a classifier (Invoked for each subscription of all received events)

Example:

Lets say our events are of type String and subscriptions are on the basis of length of the string.

import akka.event.ScanningClassification

/**
 * Publishes String messages with length less than or equal to the length
 * specified when subscribing.
 */
class ScanningBusImpl extends EventBus with ScanningClassification {
  type Event = String
  type Classifier = Int
  type Subscriber = ActorRef

  // is needed for determining matching classifiers and storing them in an
  // ordered collection
  override protected def compareClassifiers(a: Classifier, b: Classifier): Int =
    if (a < b) -1 else if (a == b) 0 else 1

  // is needed for storing subscribers in an ordered collection  
  override protected def compareSubscribers(a: Subscriber, b: Subscriber): Int =
    a.compareTo(b)

  // determines whether a given classifier shall match a given event; it is invoked
  // for each subscription for all received events, hence the name of the classifier
  override protected def matches(classifier: Classifier, event: Event): Boolean =
    event.length <= classifier

  // will be invoked for each event for all subscribers which registered themselves
  // for a classifier matching this event
  override protected def publish(event: Event, subscriber: Subscriber): Unit = {
    subscriber ! event
  }
}

Test:

val scanningBus = new ScanningBusImpl
scanningBus.subscribe(testActor, 3)
scanningBus.publish("xyzabc")
scanningBus.publish("ab")
expectMsg("ab")
scanningBus.publish("abc")
expectMsg("abc")

results matching ""

    No results matching ""