Introduction to Alpakka DynamoDB

Reading Time: 3 minutes

Alpakka DynamoDB

This AWS DynamoDB connector provides a flow for streaming DynamoDB requests, using Akka stream and AWS java DynamoDB SDK.

And in this blog, we are going to work on version 1.1.2

So, let’s get it started-

1. Add Library Dependencies: –

Setup sbt project and add these library dependencies in build.sbt

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.25"
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-dynamodb" % "1.1.2"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.656"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.25"

2. Setup application.conf: –

Create application.conf file and place it inside the src\main\resources\

akka.stream.alpakka.dynamodb {
  region = "eu-west-1"
  host = "localhost"
  port = 8000
  tls = false
  parallelism = 32
  credentials {
    access-key-id = "dummy-access-key"
    secret-key-id = "dummy-secret-key"
  }
}

3. Create some functions to operate with DynamoDB using Akka Stream

Here, we are going to create a class which contain some methods that will operate with DynamoDB using Akka Streams and we will be going to use these methods to perform CRUD

import akka.stream.ActorMaterializer
import akka.stream.alpakka.dynamodb.AwsOp
import akka.stream.alpakka.dynamodb.scaladsl.DynamoDb
import com.amazonaws.services.dynamodbv2.model._

import scala.collection.JavaConverters._
import scala.concurrent.Future

/**
 * A class to operate with DynamoDB using aws sdk and akka streams
 * It does not use scanamo alpakka
 *
 * @param tableName table name
 * @param materializer actor materializer (for akka streams)
 */
class DatabaseOperation(tableName: String)(implicit val materializer: ActorMaterializer) {

  def createTable(
                   keySchemaElements: Seq[KeySchemaElement],
                   attributeDefinitions: Seq[AttributeDefinition],
                   provisionedThroughput: ProvisionedThroughput
                 ): Future[CreateTableResult] = {
    DynamoDb.single(
      new CreateTableRequest()
        .withTableName(tableName)
        .withKeySchema(keySchemaElements.asJava)
        .withAttributeDefinitions(attributeDefinitions.asJava)
        .withProvisionedThroughput(provisionedThroughput)
    )
  }

  def getItem(keyToGet: Map[String, AttributeValue]): Future[GetItemResult] = {
    DynamoDb.single(
      new GetItemRequest()
        .withTableName(tableName)
        .withKey(keyToGet.asJava)
    )
  }

  def putItem(itemToPut:Map[String,AttributeValue]): Future[PutItemResult] = {
    DynamoDb.single(
      new PutItemRequest()
        .withTableName(tableName)
        .withItem(itemToPut.asJava)
    )
  }

And in the above code: –

  1. keySchemaElements: – defines Partition key and Sort key.
  2. attributeDefinitions: – defines datatype of attributes that are used as a key for the table including both partition key and sort key.
  3. provisionedThroughput: – defines AWS DynamoDB Provisioned Throughput.

4. Initialise ActorSystem and DatabaseOperation: –

Initialize ActorSystem, ActorMaterializer, ExecutionContext and DatabaseOperation class

implicit val system : ActorSystem = ActorSystem("akka-alpakka-dynamoDB")
implicit val materializer : ActorMaterializer = ActorMaterializer()
implicit val ec : ExecutionContext = system.dispatcher
    
val tableName : String = "User"
    
val databaseOperation : DatabaseOperation = new DatabaseOperation(tableName)

5. Create table and define key schema

So, to create table in dynamoDb we need to specify key schema and provisioned throughput

lazy val keySchemaElements: List[KeySchemaElement] = List(
        new KeySchemaElement("email", KeyType.HASH)
    )
lazy val attributeDefinitions: List[AttributeDefinition] = List(
        new AttributeDefinition("email", "S")
    )
lazy val provisionedThroughput: ProvisionedThroughput =
        new ProvisionedThroughput(20L, 20L)
    
databaseOperation.createTable(keySchemaElements, attributeDefinitions, provisionedThroughput).onComplete
    {
        case Success(createTableResult : CreateTableResult) => println(createTableResult)
        case Failure(exception) => exception.printStackTrace()
    }

6. Insertion and Querying data

To insert data, we need to create a map containing attribute and its value

val item : Map[String, AttributeValue] = Map (
        "email" -> new AttributeValue("example.example@example.com"),
        "name" -> new AttributeValue("example")
    )
    
databaseOperation.putItem(item).onComplete
    {
        case Success(putItemResult) => println(putItemResult)
        case Failure(exception) => exception.printStackTrace()
    }

Now, for querying data: –

  val getItem : Map[String, AttributeValue] = Map (
        "email" -> new AttributeValue("example.example@example.com")
    )
    
import scala.collection.JavaConverters._
    
databaseOperation.getItem(getItem).onComplete
    {
        case Success(getItemResult) => getItemResult.getItem.asScala.foreach(println)
        case Failure(exception) => exception.printStackTrace()
    }

So, from above we learn how to use Akka Stream with DynamoDB.

To setup local DynamoDB, Click Here.