
Alpakka DynamoDB
This AWS DynamoDB connector provides a flow for streaming DynamoDB requests, using Akka stream and AWS java DynamoDB SDK.
And in this blog, we are going to work on version 1.1.2
So, let’s get it started-
1. Add Library Dependencies: –
Setup sbt
project and add these library dependencies in build.sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.25"
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-dynamodb" % "1.1.2"
libraryDependencies += "com.amazonaws" % "aws-java-sdk-dynamodb" % "1.11.656"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.25"
2. Setup application.conf
: –
Create application.conf
file and place it inside the src\main\resources\
akka.stream.alpakka.dynamodb {
region = "eu-west-1"
host = "localhost"
port = 8000
tls = false
parallelism = 32
credentials {
access-key-id = "dummy-access-key"
secret-key-id = "dummy-secret-key"
}
}
3. Create some functions to operate with DynamoDB
using Akka Stream
Here, we are going to create a class which contain some methods that will operate with DynamoDB using Akka Streams and we will be going to use these methods to perform CRUD
import akka.stream.ActorMaterializer
import akka.stream.alpakka.dynamodb.AwsOp
import akka.stream.alpakka.dynamodb.scaladsl.DynamoDb
import com.amazonaws.services.dynamodbv2.model._
import scala.collection.JavaConverters._
import scala.concurrent.Future
/**
* A class to operate with DynamoDB using aws sdk and akka streams
* It does not use scanamo alpakka
*
* @param tableName table name
* @param materializer actor materializer (for akka streams)
*/
class DatabaseOperation(tableName: String)(implicit val materializer: ActorMaterializer) {
def createTable(
keySchemaElements: Seq[KeySchemaElement],
attributeDefinitions: Seq[AttributeDefinition],
provisionedThroughput: ProvisionedThroughput
): Future[CreateTableResult] = {
DynamoDb.single(
new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchemaElements.asJava)
.withAttributeDefinitions(attributeDefinitions.asJava)
.withProvisionedThroughput(provisionedThroughput)
)
}
def getItem(keyToGet: Map[String, AttributeValue]): Future[GetItemResult] = {
DynamoDb.single(
new GetItemRequest()
.withTableName(tableName)
.withKey(keyToGet.asJava)
)
}
def putItem(itemToPut:Map[String,AttributeValue]): Future[PutItemResult] = {
DynamoDb.single(
new PutItemRequest()
.withTableName(tableName)
.withItem(itemToPut.asJava)
)
}
And in the above code: –
- keySchemaElements: – defines
Partition key
andSort key
. - attributeDefinitions: – defines datatype of attributes that are used as a key for the table including both partition key and sort key.
- provisionedThroughput: – defines AWS DynamoDB Provisioned Throughput.
4. Initialise ActorSystem
and DatabaseOperation
: –
Initialize ActorSystem
, ActorMaterializer
, ExecutionContext
and DatabaseOperation
class
implicit val system : ActorSystem = ActorSystem("akka-alpakka-dynamoDB")
implicit val materializer : ActorMaterializer = ActorMaterializer()
implicit val ec : ExecutionContext = system.dispatcher
val tableName : String = "User"
val databaseOperation : DatabaseOperation = new DatabaseOperation(tableName)
5. Create table and define key schema
So, to create table in dynamoDb we need to specify key schema and provisioned throughput
lazy val keySchemaElements: List[KeySchemaElement] = List(
new KeySchemaElement("email", KeyType.HASH)
)
lazy val attributeDefinitions: List[AttributeDefinition] = List(
new AttributeDefinition("email", "S")
)
lazy val provisionedThroughput: ProvisionedThroughput =
new ProvisionedThroughput(20L, 20L)
databaseOperation.createTable(keySchemaElements, attributeDefinitions, provisionedThroughput).onComplete
{
case Success(createTableResult : CreateTableResult) => println(createTableResult)
case Failure(exception) => exception.printStackTrace()
}
6. Insertion and Querying data
To insert data, we need to create a map containing attribute and its value
val item : Map[String, AttributeValue] = Map (
"email" -> new AttributeValue("example.example@example.com"),
"name" -> new AttributeValue("example")
)
databaseOperation.putItem(item).onComplete
{
case Success(putItemResult) => println(putItemResult)
case Failure(exception) => exception.printStackTrace()
}
Now, for querying data: –
val getItem : Map[String, AttributeValue] = Map (
"email" -> new AttributeValue("example.example@example.com")
)
import scala.collection.JavaConverters._
databaseOperation.getItem(getItem).onComplete
{
case Success(getItemResult) => getItemResult.getItem.asScala.foreach(println)
case Failure(exception) => exception.printStackTrace()
}
So, from above we learn how to use Akka Stream with DynamoDB.
To setup local DynamoDB, Click Here
.