What is Akka cluster sharding?
Akka cluster sharding is a way of distributing your actors over a network of nodes and interacting with them transparently without knowing/caring about their actual location in the cluster. It provides you out-of-the-box horizontal scalability when you have many more live actors that what can fit in the memory of a single machine.
For example, if you are creating actors corresponding to your users and you have 10 million users in your system and one node can effectively handle 1 million actors, then you can shard your user actors over 10 nodes, each nodes effectively handling close to 1 million actors. However you should have more shards than number of nodes (~ 10 times) to make it easier to rebalance shards when we add/remove nodes from the cluster.
In this example, you can have 100 shards, each node hosting roughly 10 shards and each shard being responsible for roughly 100k actors. When we add a new node (making a total of 11 nodes), one shard from each node will be moved to this new node. Now 10 nodes will have 9 shards each and one node will have 10 shards.
How are shards allocated to nodes?
A special actor, known as cluster coordinator, is responsible to allocating shards to nodes. Cluster coordinator runs a cluster singleton (e.g. there is only one instance of coordinator running in the cluster). The coordinator knows the locations of all the shards and is responsible for moving shards around when nodes are added/removed to the cluster.
Shard Allocation Strategy
If a new shard is to be added to an existing cluster, the coordinator uses shard allocation strategy to select a node to allocate the shard to. The default strategy implementation isShardCoordinator.LeastShardAllocationStrategy
, which allocates a new shard to the node having least number of previously allocated shards.
How are actors mapped to shards?
The mapping of actors to shards is application-specific and for application developers to decide. A common approach is to map actors randomly so that total number of actors are divided roughly equally amongst all the shards.
So in our earlier example of creating actors for users, you can use hash of user id to map a particular user actor to a shard. Lets say you have N shards, then you can get the shard id for a user U as follows:
shardId = math.abs(U.userId.hashCode) % N
You are required to provide the implementation of actor-to-shard mapping strategy and it can be anything application specific. Just make sure that your workload is distributed across the cluster and some nodes do not become hot i.e. having disproportionate number of actors. If this happens, the few of your nodes will be doing a lot of work while others sit idle. Again it's up to you to analyze your application/workload and come up with appropriate strategy.
What is ShardRegion actor?
On each node in the cluster that is supposed to host shards, a ShardRegion actor is created. The ShardRegion actor has the following responsibilities:
- Host the shards on the node and deliver incoming messages to actors in these shards
- Send outgoing messages to appropriate nodes/shards in the cluster
Whenever we want to send a message to any actor in the cluster, we have to send it to the local ShardRegion actor. The ShardRegion actor is, in turn, responsible for finding the location of the target actor (it's corresponding shard and node) and delivering the message.
How are messages sent to actors?
Lets say we want to send a message M
to user actor (U
) identified by id 16942. We first need an instance of local ShardRegion actor (SRLocal
). This instance can be obtained using the following code:
val shardRegion = ClusterSharding(system).shardRegion(shardName)
Then we'll send the message M
to SRLocal
. SRLocal
first needs to extract the entity id (e.g. user id in this case) and shard id from the message content. You are required to provide the implementation of both of these functions for all messages you want to send to actors.
Once SRLocal
has the target shard S
, it'll check if it already has the location of the shard region hosting S
. If yes, it'll send the message to that shard region. If not, it'll contact the shard coordinator and ask for the location of the shard region owning S
. There can be two scenerios:
SRLocal
itself hosts S
In this case, SRLocal
will create a shard supervisor child actor for S and will create the target entity (i.e. user actor with is 16942) and deliver the message to this entity.
S
is hosted on a remote shard region (SRRemote
)
In this case, SRLocal
will forward the message to SRRemote
. SRRemote
, upon receiving the message, will create a shard supervisor child actor for S
and will create the target entity (i.e. user actor with is 16942) and deliver the message to this entity.
What happens to messages received while shard location is unknown?
As discussed, if the location of target shard is unknown, the shard region actor will contact the coordinator for the location. If new messages are received for the shard while shard region is determining the location, then these messages will be buffered in memory. Once the location is determined, the buffered messages are deliverd in the order they arrived.
How are shards rebalanced?
When we add or remove nodes from the cluster, shards are rebalanced to divide the workload amongst new set of nodes. The rebalancing process is carried out by the shard coordinator and involves the following steps:
Identify shards to be moved
The coordinator identifies the shards to be moved.
If a new node is added, some shards from existing nodes will be moved to this new node.
The strategy for selecting shards to be moved is pluggable. By default,ShardCoordinator.LeastShardAllocationStrategy
implementation is used, which selects shards from nodes having most number of previously allocated shards.
If a node is removed, shards previously hosted on removed node will be moved to remaining nodes.
The strategy for selecting nodes to move shards to is pluggable. By default,ShardCoordinator.LeastShardAllocationStrategy
implementation is used, which selects nodes having least number of previously allocated shards.
Notify handoff to all shard region actors
The coordinator notifies all shard region actors that a handoff of a shard has started. Shard region actors, in turn, will stop forwarding messages targeted to that shard and instead will buffer them so that they can be delivered after the shard has been successfully moved.
Also if a shard region requests coordinator the location of the shard being moved, coordinator will not reply and the region will have to buffer messages until it can determine the location.
Stop all entities in the shard
The shard region hosting the shard will stop all the entities within that shard by sending the specified handOffStopMessage (default PoisonPill) to them. Once all the entities have been terminated, the shard region will notify the coordinator.
Point shard to it's new home
By this time, handoff is completed. Coordinator will determine the new node for the shard and answer location queries for this shard with the new home. All shard regions will then send buffered messages to this new location. New entities will be created on new node as messages are received.
How is the state of the entities transfered during rebalancing?
The in-memory state of the entities is not transfered during rebalancing. The entity at the old location is terminated and the entity is created on new location afresh. Any in-memory state associated with the entity at the old locaton will be destroyed.
If you want to maintain the previous state of the entities, you must use persistence. Then, when the entity is created in new location, it can recover it's previous state from the persisted event log.
What if the coordinator fails?
The shard coordinator runs as a cluster singleton and it maintains data about shard locations. If the coordinator fails, it'll be restarted on a different node. In this case, it must be able to recover the data about shard locations.
For this, the state of the locations of various shards is made persistent in coordinator using either distributed data or persistence. When it is started on a different node, it is able to recover this data. During this failover, new requests for shard locations will not be answered. Shard regions can still continue to send messages to regions whose location was previously known.