Scale Out with Cluster Sharding

Reading Time: 3 minutes

Introduction

Working with data storage systems for long enough, you are bound to bump into the term “Sharding“. When data is sharded, it is spread out over multiple backend stores, allowing more data to be stored than that possible on a single-server node. Yet the system always knows where any shard or piece of that data is, based on the key of the shard.

Akka’s Cluster Sharding is an approach similar to standard data sharding, spreading actor instances over a set of nodes. It can also determine where the instances are by their keys or Persistence IDs. This approach ensures that for a given persistence ID, there will only be one actor instance currently in memory in the cluster to represent it.

Cluster Sharding – The Basics

Within cluster sharding, the core concept to understand is ShardRegion. It is a special actor that starts in each cluster node and hosts persistent actors. They act as a local proxy that gives access to the realm of sharded actors. For every entity type that is to be sharded, one ShardRegion actor needs to be started.

When a request comes in for a particular entity, it routes through the ShardRegion actor. The ShardRegion looks at the request and via two user-defined functions, extracts it’s entity ID and shard ID. This helps Akka in figuring out what shard is responsible for the request. If the ShardRegion does not know about this shard ID yet, it contacts the central ShardCoordinator to ask about the shard’s location, which uses Akka Persistence to persist all shard locations.

The ShardCoordinator can respond in one of the two ways:

  • It responds that the Shard is local to the requesting ShardRegion. In this case, the latter creates the shard and a local supervisor, and delegates the request to it. The supervisor then creates the entity actor as a child.
  • It responds with a remote ShardRegion owner of the requested shard. The non-owner ShardRegion forwards the request to the owner ShardRegion. This is where the process starts again.
Information flow when Shard Region is local and when Shard Region is remote

Once Akka resolves the home of a particular shard actor, the originally requesting ShardRegion caches this knowledge. It does so to skip any further calls to ShardCoordinator for the same ShardRegion.

In ClusterSharding, a process called rebalancing happens every time a node becomes a member of the cluster or leaves it. It moves entire shards to other nodes in the cluster to balance out the number of shards each node is responsible for.

Refactoring the code for Cluster Sharding

In order to enable cluster sharding for an actor type, Akka needs a few things:

  • actors receive messages via the shard region, so the shard region requires a way to retrieve the unique identifier of the actor from the message.
  • Akka requires a way to compute a stable shard identifier from messages sent to actors
  • Props of the entity actor so that Akka can create them when required
  • sharding configuration

Consider an actor, for example, a Merchant, that requires many different live instances active once the payment platform is active and popular. Cluster Sharding applies to such use cases. In order to start cluster sharding on a node, the start method of the extension can be used:

  object Merchant {
    def startSharding(system:ActorSystem): ActorRef = {
      ClusterSharding(system).start(
        typeName = "ShardedMerchant",
        entityProps = props(),
        settings = ClusterShardingSettigs(system),
        extractShardId = extractShardId,
        extractEntityId = extractEntityId
      )
    }
  }

The above method returns an ActorRef, reference of the shard region actor.

For Akka to figure out the unique entity identifier and stable shard identifier from messages, we need to implement extractEntityId and extractShardId methods.

If every message sent to the ShardedApp actor implements the following trait:

sealed trait MerchantCommand {
  val order: Order
}

then, we can obtain a unique identitifier of a merchant as:

val extractEntityId: ShardRegion.ExtractEntityId = {
  case m: MerchantCommand => (m.order.account.a, m)
}

Similarly, we can extract Shard Id using a simple algorithm like:

val numberOfShards = 100
val extractShardId: ShardRegion.ExtractShardId = {
  case m: MerchantCommand => math.abs(m.order.account.hashCode()) % numberOfShards).toString
}

We can simply start the sharded actor as:

val merchant = Merchant.startMerchantSharding(context.system)

Now, we can use this actor ref wherever the actual merchant actor is required.

Conclusion

Akka cluster sharding is a powerful feature that, combined with Akka Persistence, allows us to scale out entities on several nodes. This keeps all state in memory. With cluster sharding in play, Akka provides an architecture that separates our application’s scale-agnostic transactional code from the distributed communication’s scale-aware code.

Written by 

Software Consultant with 2+ years of experience, with a strong inclination towards Big Data Analytics and Data Science.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading