-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathDynamoDBEcho.scala
173 lines (143 loc) · 6.29 KB
/
DynamoDBEcho.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package alpakka.dynamodb
import com.github.pjfanning.pekkohttpspi.PekkoHttpClient
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.dynamodb.scaladsl.DynamoDb
import org.apache.pekko.stream.scaladsl.{FlowWithContext, Sink, Source}
import org.slf4j.LoggerFactory
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy
import software.amazon.awssdk.core.retry.conditions.RetryCondition
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.*
import java.net.URI
import java.util
import java.util.UUID
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.jdk.CollectionConverters.*
import scala.util.Try
class DynamoDBEcho(urlWithMappedPort: URI, accessKey: String, secretKey: String, region: String) {
private val logger = LoggerFactory.getLogger(this.getClass)
implicit val system: ActorSystem = ActorSystem("DynamoDBEcho")
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
private val testTableName = "testTable"
val credentialsProvider: StaticCredentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))
implicit val client: DynamoDbAsyncClient = createAsyncClient()
def run(noOfItems: Int): Future[Int] = {
for {
_ <- createTable()
_ <- writeItems(noOfItems)
result <- readItems(noOfItems)
} yield result
}
// Create a table and read description of this table
private def createTable() = {
val source: Source[DescribeTableResponse, NotUsed] = Source
.single(createTableRequest())
.via(DynamoDb.flow(parallelism = 1))
.map(response => DescribeTableRequest.builder().tableName(response.tableDescription.tableName).build())
.via(DynamoDb.flow(parallelism = 1))
source
.wireTap(descTableResponse => logger.info(s"Successfully created table: ${descTableResponse.table.tableName}"))
.runWith(Sink.ignore)
}
private def createTableRequest(): CreateTableRequest = {
val attributeDefinitions = util.Arrays.asList(
AttributeDefinition.builder()
.attributeName("Id")
.attributeType(ScalarAttributeType.S)
.build()
)
val keySchema = util.Arrays.asList(
KeySchemaElement.builder()
.attributeName("Id")
.keyType(KeyType.HASH)
.build()
)
CreateTableRequest.builder()
.tableName(testTableName)
.keySchema(keySchema)
.attributeDefinitions(attributeDefinitions)
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(5L)
.writeCapacityUnits(5L)
.build())
.build()
}
private def writeItems(noOfItems: Int) = {
logger.info(s"About to write $noOfItems items...")
case class RequestContext(tableName: String, requestId: String)
val sourceWrite = Source(1 to noOfItems)
.map(item => {
val requestId = UUID.randomUUID().toString
val request = PutItemRequest.builder().tableName(testTableName).item(Map(
"Id" -> AttributeValue.builder().s(item.toString).build(),
"att1" -> AttributeValue.builder().s(s"att1-$item").build(),
"att2" -> AttributeValue.builder().n(s"$item").build()
).asJava).build()
(request, RequestContext(testTableName, requestId))
})
val flowWrite: FlowWithContext[PutItemRequest, RequestContext, Try[PutItemResponse], RequestContext, NotUsed] =
DynamoDb.flowWithContext(parallelism = 1)
sourceWrite
.via(flowWrite)
.map((response: (Try[PutItemResponse], RequestContext)) => {
val status = response._1.get.sdkHttpResponse().statusCode()
logger.info(s"Successfully written item. Http response: $status")
})
.runWith(Sink.ignore)
}
private def readItems(noOfItems: Int) = {
logger.info(s"About to read 2nd half of all items...")
val filterExpression = "#att2 > :val"
val expressionAttrNames = new java.util.HashMap[String, String]()
expressionAttrNames.put("#att2", "att2")
val expressionAttrValues = new java.util.HashMap[String, AttributeValue]()
expressionAttrValues.put(":val", AttributeValue.builder().n((noOfItems / 2).toString).build())
val scanRequest = ScanRequest.builder()
.tableName(testTableName) // This hangs when no table is available
.filterExpression(filterExpression)
.expressionAttributeNames(expressionAttrNames)
.expressionAttributeValues(expressionAttrValues)
.build()
val scanPageInFlow: Source[ScanResponse, NotUsed] =
Source
.single(scanRequest)
.via(DynamoDb.flowPaginated())
scanPageInFlow.map { scanResponse =>
val count = scanResponse.scannedCount()
val resultCount = scanResponse.items().size()
logger.info(s"Successfully read $resultCount/$count items")
scanResponse.items().forEach(item => logger.info(s"Item: $item"))
resultCount
}.runWith(Sink.head)
}
private def createAsyncClient() = {
val client = DynamoDbAsyncClient
.builder()
.endpointOverride(urlWithMappedPort)
.region(Region.of(region))
.credentialsProvider(credentialsProvider)
.httpClient(PekkoHttpClient.builder().withActorSystem(system).build())
// https://pekko.apache.org/docs/pekko-connectors/current/aws-shared-configuration.html
// https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/retry/RetryPolicy.html
.overrideConfiguration(
ClientOverrideConfiguration
.builder()
.retryPolicy(
RetryPolicy.builder
.backoffStrategy(BackoffStrategy.defaultStrategy)
.throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy)
.numRetries(SdkDefaultRetrySetting.defaultMaxAttempts)
.retryCondition(RetryCondition.defaultRetryCondition)
.build)
.build())
.build()
system.registerOnTermination(client.close())
client
}
}