热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用SpringBoot将RestClient异步到DynamoDB

1.Overview从Spring框架5.0和SpringBoot2.0开始,该框架提供对异步编程的支持,从2.0版本开始的AWS开发工具包也是如此。在

1. Overview

从Spring框架5.0和Spring Boot 2.0开始,该框架提供对异步编程的支持,从2.0版本开始的AWS开发工具包也是如此。

在本文中,我将通过构建简单的反应式REST应用程序来探索使用异步DynamoDB API和Spring Webflux。 假设我们需要处理HTTP请求以检索或存储一些Event(id:string,body:string)。 事件将存储在DynamoDB中。

It might be easier to simply look at the code on Github and follow it there.

2. Dependencies

让我们从WebFlux和DynamoDB SDK的Maven依赖关系开始

org.springframework.bootspring-boot-starter-webflux2.2.4.RELEASEsoftware.amazon.awssdkdynamodb2.10.65

3. DynamoDB
3.1 Spring Configuration

一个简单的配置就是我们建立了与DynamoDB的连接。 为了测试目的,我们需要指定dynamoEndpoint在Forreal应用程序中,我们需要指定aws区域。

@Configuration
public class AppConfig {@Value("${aws.accessKey}")String accessKey;@Value("${aws.secretKey}")String secretKey;@Value("${dynamodb.endpoint:}")String dynamoEndpoint;@BeanAwsBasicCredentials awsBasicCredentials(){return AwsBasicCredentials.create(accessKey, secretKey);}@BeanDynamoDbAsyncClient dynamoDbAsyncClient(AwsBasicCredentials awsBasicCredentials){DynamoDbAsyncClientBuilder clientBuilder = DynamoDbAsyncClient.builder();clientBuilder.credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials));if(!dynamoEndpoint.isEmpty()){clientBuilder.endpointOverride(URI.create(dynamoEndpoint));}return clientBuilder.build();}
}

application.yaml带有连接细节

aws:accessKey: anysecretKey: any
dynamodb:endpoint: http://localhost:8000/

3.2 Reactive DynamoDB Service

Unfortunately, second version of AWS SDK doesn’t have support for DynamoDBMapper yet(you can track mapper’s readiness here), so table creation, sending requests and parsing responses need to be done by “low level” API.

在DynamoDbService我们会:

  • 如果不存在则创建表Implement methods for saving and retrieving event

@Service
public class DynamoDbService {public static final String TABLE_NAME &#61; "events";public static final String ID_COLUMN &#61; "id";public static final String BODY_COLUMN &#61; "body";final DynamoDbAsyncClient client;&#64;Autowiredpublic DynamoDbService(DynamoDbAsyncClient client) {this.client &#61; client;}//Creating table on startup if not exists&#64;PostConstructpublic void createTableIfNeeded() throws ExecutionException, InterruptedException {ListTablesRequest request &#61; ListTablesRequest.builder().exclusiveStartTableName(TABLE_NAME).build();CompletableFuture listTableResponse &#61; client.listTables(request);CompletableFuture createTableRequest &#61; listTableResponse.thenCompose(response -> {boolean tableExist &#61; response.tableNames().contains(TABLE_NAME);if (!tableExist) {return createTable();} else {return CompletableFuture.completedFuture(null);}});//Wait in synchronous manner for table creationcreateTableRequest.get();}public CompletableFuture saveEvent(Event event) {Map item &#61; new HashMap<>();item.put(ID_COLUMN, AttributeValue.builder().s(event.getUuid()).build());item.put(BODY_COLUMN, AttributeValue.builder().s(event.getBody()).build());PutItemRequest putItemRequest &#61; PutItemRequest.builder().tableName(TABLE_NAME).item(item).build();return client.putItem(putItemRequest);}public CompletableFuture getEvent(String id) {Map key &#61; new HashMap<>();key.put(ID_COLUMN, AttributeValue.builder().s(id).build());GetItemRequest getRequest &#61; GetItemRequest.builder().tableName(TABLE_NAME).key(key).attributesToGet(BODY_COLUMN).build();return client.getItem(getRequest).thenApply(item -> {if (!item.hasItem()) {return null;} else {Map itemAttr &#61; item.item();String body &#61; itemAttr.get(BODY_COLUMN).s();return new Event(id, body);}});}private CompletableFuture createTable() {CreateTableRequest request &#61; CreateTableRequest.builder().tableName(TABLE_NAME).keySchema(KeySchemaElement.builder().attributeName(ID_COLUMN).keyType(KeyType.HASH).build()).attributeDefinitions(AttributeDefinition.builder().attributeName(ID_COLUMN).attributeType(ScalarAttributeType.S).build()).billingMode(BillingMode.PAY_PER_REQUEST).build();return client.createTable(request);}
}

4. Reactive REST Controller

一个简单的控制器&#xff0c;具有用于通过id检索事件的GET方法和用于在DynamoDB中保存事件的POST方法。 我们可以通过两种方式来做到这一点-用批注实现或摆脱批注并以功能性方式实现。这不会对性能产生影响&#xff0c;在大多数情况下&#xff0c;这完全取决于个人喜好。

4.1 Annotated Controllers

&#64;RestController
&#64;RequestMapping("/event")
public class AnnotatedController {final DynamoDbService dynamoDbService;public AnnotatedController(DynamoDbService dynamoDbService) {this.dynamoDbService &#61; dynamoDbService;}&#64;GetMapping(value &#61; "/{eventId}", produces &#61; MediaType.APPLICATION_JSON_VALUE)public Mono getEvent(&#64;PathVariable String eventId) {CompletableFuture eventFuture &#61; dynamoDbService.getEvent(eventId);return Mono.fromCompletionStage(eventFuture);}&#64;PostMapping(consumes &#61; MediaType.APPLICATION_JSON_VALUE)public void saveEvent(&#64;RequestBody Event event) {dynamoDbService.saveEvent(event);}
}

4.2 Functional Endpoints

这是一个轻量级的函数编程模型&#xff0c;其中的函数用于路由和处理请求。

&#64;Configuration
public class HttpRouter {&#64;Beanpublic RouterFunction eventRouter(DynamoDbService dynamoDbService) {EventHandler eventHandler &#61; new EventHandler(dynamoDbService);return RouterFunctions.route(GET("/eventfn/{id}").and(accept(APPLICATION_JSON)), eventHandler::getEvent).andRoute(POST("/eventfn").and(accept(APPLICATION_JSON)).and(contentType(APPLICATION_JSON)), eventHandler::saveEvent);}static class EventHandler {private final DynamoDbService dynamoDbService;public EventHandler(DynamoDbService dynamoDbService) {this.dynamoDbService &#61; dynamoDbService;}Mono getEvent(ServerRequest request) {String eventId &#61; request.pathVariable("id");CompletableFuture eventGetFuture &#61; dynamoDbService.getEvent(eventId);Mono eventMono &#61; Mono.fromFuture(eventGetFuture);return ServerResponse.ok().body(eventMono, Event.class);}Mono saveEvent(ServerRequest request) {Mono eventMono &#61; request.bodyToMono(Event.class);eventMono.map(dynamoDbService::saveEvent);return ServerResponse.ok().build();}}
}

5. Spring DynamoDB Integration Test
5.1 Maven dependencies

为了与DynamoDB运行集成测试&#xff0c;我们需要DynamoDBLocal&#xff0c;它不是真正的DynamoDB&#xff0c;而是在其之上具有实现的DynamoDB接口的SQLite。

com.amazonawsDynamoDBLocal1.12.0test
org.apache.maven.pluginsmaven-dependency-plugin2.10copytest-compilecopy-dependenciestestso,dll,dylib${project.basedir}/target/native-libs
dynamodb-local-oregonDynamoDB Local Release Repositoryhttps://s3-us-west-2.amazonaws.com/dynamodb-local/release

5.2 DynamoDB server

现在我们需要在测试运行之前启动DynamoDB。 我更喜欢将其作为JUnit类规则来执行&#xff0c;但我们也可以将其作为spring bean来执行。

public class LocalDynamoDbRule extends ExternalResource {protected DynamoDBProxyServer server;public LocalDynamoDbRule() {//here we set the path from "outputDirectory" of maven-dependency-pluginSystem.setProperty("sqlite4java.library.path", "target/native-libs");}&#64;Overrideprotected void before() throws Exception {this.server &#61; ServerRunner.createServerFromCommandLineArgs(new String[]{"-inMemory", "-port", "8000"});server.start();}&#64;Overrideprotected void after() {this.stopUnchecked(server);}protected void stopUnchecked(DynamoDBProxyServer dynamoDbServer) {try {dynamoDbServer.stop();} catch (Exception e) {throw new RuntimeException(e);}}
}

5.3 Running test

现在&#xff0c;我们可以创建一个集成测试&#xff0c;并通过id和save事件测试get事件。

&#64;RunWith(SpringRunner.class)
&#64;SpringBootTest(webEnvironment &#61; SpringBootTest.WebEnvironment.RANDOM_PORT)
public class IntegrationTest {&#64;ClassRulepublic static LocalDynamoDbRule dynamoDbRule &#61; new LocalDynamoDbRule();&#64;Autowiredprivate WebTestClient webTestClient;&#64;Testpublic void getEvent() {// Create a GET request to test an endpointwebTestClient.get().uri("/event/1").accept(MediaType.APPLICATION_JSON).exchange()// and use the dedicated DSL to test assertions against the response.expectStatus().isOk().expectBody(String.class).isEqualTo(null);}&#64;Testpublic void saveEvent() throws InterruptedException {Event event &#61; new Event("10", "event");webTestClient.post().uri("/event/").body(BodyInserters.fromValue(event)).exchange().expectStatus().isOk();Thread.sleep(1500);webTestClient.get().uri("/event/10").accept(MediaType.APPLICATION_JSON).exchange().expectStatus().isOk().expectBody(Event.class).isEqualTo(event);}
}

6. Docker

在这里&#xff0c;我们将准备在docker中运行的应用程序&#xff0c;以便可以将其部署到AWS。

小提示&#xff1a;从Java 10开始&#xff0c;您可以指定JVM将使用多少内存&#xff0c;具体取决于容器内存。 -XX&#xff1a;MaxRAMPercentage &#61; 75.0意味着JVM不会使用超过75&#xff05;的容器内存。

Dockerfile

# Use our standard java12 baseimage
FROM openjdk:12-alpine# Copy the artifact into the container
COPY target/dynamodb-spring-*-exec.jar /srv/service.jar# Run the artifact and expose the default port
WORKDIR /srvENTRYPOINT [ "java", \"-XX:&#43;UnlockExperimentalVMOptions", \"-XX:&#43;ExitOnOutOfMemoryError", \"-XX:MaxRAMPercentage&#61;75.0", \"-Djava.security.egd&#61;file:/dev/./urandom", \"-jar", "service.jar", \"--spring.profiles.active&#61;prod" ]EXPOSE 8080

构建Docker容器本身dockerbuild-tspring-dynamo。 Alsolet&#39;sseewhatwasgeneratedbysudodockerimagels

REPOSITORY TAG IMAGE ID CREATED SIZE
spring-dynamo latest a974d880400e About a minute ago 364MB
openjdk 12-alpine 0c68e7c5b7a0 12 months ago 339MB


终于&#xff0c;我们的poc准备好了&#xff01;

快乐的编码:)

from: https://dev.to//byegor/spring-webflux-with-async-dynamodb-4ig0



推荐阅读
author-avatar
吕兵涛_836
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有