热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

AWS学习笔记(七)集成SQS和Lambda

利用JavaSDK创建、删除Queue,发送、接收、删除Message。创建、编写、测试Lambd
本文介绍了集成SQS和Lambda的方法,代码基于JAVA SDK。

POM配置
  
      
        aws-java-sdk-sqs  
        com.amazonaws  
      
      
        com.amazonaws  
        aws-lambda-java-core  
        1.2.0  
      
      
        com.amazonaws  
        aws-lambda-java-events  
        2.0.2  
      
      
        com.amazonaws  
        aws-lambda-java-log4j2  
        1.1.0  
      
  

  
      
          
            com.amazonaws  
            aws-java-sdk-bom  
            1.11.272  
            pom  
            import  
          
      
SQS

中国区目前仅支持标准Queue,不支持FIFO Queue,下面代码以标准Queue为例,演示了创建Queue、配置Dead Letter Queue、发送Message、接收Message、删除Message、删除Queue的方法:

import com.amazonaws.regions.Regions;  
import com.amazonaws.services.sqs.AmazonSQS;  
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;  
import com.amazonaws.services.sqs.model.*;  

import java.util.*;  

public class SqsUtil {  
    private static final String ARN_ATTRIBUTE_NAME = "QueueArn";  
    private static AmazonSQS sqs;  

    static {  
        sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.CN_NORTH_1).build();  
    }  

    private SqsUtil() {  
    }  

    public static String createQueue(String queueName) {  
        System.out.println("Creating a new SQS queue called " + queueName);  

        CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName);  
        Map attributes = new HashMap<>();  
        // 接收消息等待时间  
        attributes.put("ReceiveMessageWaitTimeSeconds", "5");  
        createQueueRequest.withAttributes(attributes);  

        return sqs.createQueue(createQueueRequest).getQueueUrl();  
    }  

    public static String createDeadLetterQueue(String queueName) {  
        String queueUrl = createQueue(queueName);  
        // 配置Dead Letter Queue时使用ARN  
        return getQueueArn(queueUrl);  
    }  

    public static void configDeadLetterQueue(String queueUrl, String deadLetterQueueArn) {  
        System.out.println("Config dead letter queue for " + queueUrl);  

        SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
                Map attributes = new HashMap<>();  
        // 最大接收次数设为5,当接收次数超过5后,消息未被处理和删除将被转到死信队列  
        attributes.put("RedrivePolicy", "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"" + deadLetterQueueArn + "\"}");  
        queueAttributes.setAttributes(attributes);  
        queueAttributes.setQueueUrl(queueUrl);  

        sqs.setQueueAttributes(queueAttributes);  
    }  

    public static void sendMessage(String queueUrl, String message) {  
        System.out.println("Sending a message to " + queueUrl);  

        SendMessageRequest request = new SendMessageRequest();  
        request.withQueueUrl(queueUrl);  
        request.withMessageBody(message);  
        Map messageAttributes = new HashMap<>();  
        // 添加消息属性,注意必须要有DataType和Value  
        messageAttributes.put("Hello", new MessageAttributeValue().withDataType("String").withStringValue("COCO"));  
        request.withMessageAttributes(messageAttributes);  

        sqs.sendMessage(request);  
    }  

    public static void receiveMessages(String queueUrl) {  
        System.out.println("Receiving messages from " + queueUrl);  

        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);  
        receiveMessageRequest.setMaxNumberOfMessages(5);  
        receiveMessageRequest.withWaitTimeSeconds(10);  
        // 要添加MessageAttributeNames,否则不能接收  
        receiveMessageRequest.setMessageAttributeNames(Arrays.asList("Hello"));  

        List messages = sqs.receiveMessage(receiveMessageRequest).getMessages();  
        for (Message message : messages) {  
            System.out.println("Message: " + message.getBody());  
            for (Map.Entry entry : message.getMessageAttributes().entrySet()) {  
                System.out.println("  Attribute");  
                System.out.println("    Name:  " + entry.getKey());  
                System.out.println("    Value: " + entry.getValue().getStringValue());  
            }

                        // Delete message  
            System.out.println("Deleting a message.");  
            String messageReceiptHandle = message.getReceiptHandle();  
            sqs.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));  
        }  
    }  

    public static void deleteQueue(String queueUrl) {  
        System.out.println("Deleting the queue " + queueUrl);  
        sqs.deleteQueue(new DeleteQueueRequest(queueUrl));  
    }  

    public static String getQueueArn(String queueUrl) {  
        List attributes = new ArrayList<>();  
        attributes.add(ARN_ATTRIBUTE_NAME);  
        GetQueueAttributesResult queueAttributes = sqs.getQueueAttributes(queueUrl, attributes);  
        return queueAttributes.getAttributes().get(ARN_ATTRIBUTE_NAME);  
    }  

}

在运行上面代码前,要在{HOME}/.aws目录下配置credentials,用户要有SQS权限:
[default]
aws_access_key_id = AAAAAAAAAAAAAA
aws_secret_access_key = MXXXXXXXXXXXXXXXXXXXXXX9

测试一下:

// 创建Dead Letter Queue  
String deadLetterQueueArn = createDeadLetterQueue("DeadLetterQueue");  
// 创建Task Queue  
String queueUrl = createQueue("TaskQueue");  
// 配置Dead Letter Queue  
configDeadLetterQueue(queueUrl, deadLetterQueueArn);  
// 发送Message  
for (int i = 0; i <6; i++) {  
    sendMessage(queueUrl, "Hello COCO " + i);  
}  
// 接收Message  
receiveMessages(queueUrl);  
// 删除Queue  
deleteQueue(queueUrl);
Lambda

Function Code

Lambda函数定义支持两种方式 :

  • 实现预定义接口RequestStreamHandler 或 RequestHandler
    import com.amazonaws.services.lambda.runtime.RequestHandler;  
    import com.amazonaws.services.lambda.runtime.Context;
    public class Hello implements RequestHandler {  
    // Request,Response为自定义的类型  
    public Response handleRequest(Request request, Context context) {  
        String greetingString = String.format("Hello %s %s.", request.firstName, request.lastName);  
        return new Response(greetingString);  
    }  
    }
  • 不实现任何接口,直接定义处理程序方法
    outputType handler-name(inputType input, Context context) {  
    ...  
    }

inputType 和 outputType 可为以下类型之一:

  • Java 基元类型(如 String 或 int)。
  • aws-lambda-java-events 库中的预定义 AWS 事件类型。 如S3Event。
  • 自己的 POJO 类。AWS Lambda 会根据该 POJO 类型自动序列化和反序列化输入、输出 JSON。

如不需要,可以省略处理程序方法签名中的 Context 对象。

先编写一个简单的测试用例接收SQS消息,输入参数input为Queue URL:

import com.amazonaws.services.lambda.runtime.Context;  
import com.amazonaws.services.lambda.runtime.LambdaLogger;  
import com.amazonaws.services.lambda.runtime.RequestHandler;  

public class Hello implements RequestHandler {  
    @Override  
    public String handleRequest(String input, Context context) {  
        LambdaLogger logger = context.getLogger();  
        logger.log("received : " + input);  
        SqsUtil.receiveMessages(input);  
        return "success";  
    }  
}

程序编写完了,如何放入到Lambda函数中呢?需要打成jar包,且须包含依赖包,pom中增加shade插件:

  
      
          
            org.apache.maven.plugins  
            maven-shade-plugin  
            3.1.0  
              
                false  
              
              
                  
                    package  
                      
                        shade  
                      
                  
              
          
      

创建Lambda Function

下面通过Web Console创建Lambda Function
技术分享图片
注意:role要有lambda、Cloudwatch Logs、SQS权限。

然后上传jar包,配置Handler
技术分享图片
再调整一下内存配置和超时参数,保存。
技术分享图片
配置测试参数,测试一下先:
技术分享图片
执行成功输出:
技术分享图片

Lambda触发器

下面修改一下代码,输入参数类型改为ScheduledEvent,将使用触发器CloudWatch Events调用。

import com.amazonaws.services.lambda.runtime.Context;  
import com.amazonaws.services.lambda.runtime.LambdaLogger;  
import com.amazonaws.services.lambda.runtime.RequestHandler;  
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;  

public class Hello implements RequestHandler {  
    @Override  
    public String handleRequest(ScheduledEvent input, Context context) {  
        LambdaLogger logger = context.getLogger();  
        logger.log("received : " + input.toString() + "\n");  
        SqsUtil.receiveMessages("https://sqs.cn-north-1.amazonaws.com.cn/891245999999/TaskQueue");  
        return "success";  
    }  
} 

上传后,同样先手工测试一下,这次选择模板Scheduled Event
技术分享图片
测试成功后,配置CloudWatch Events触发器,Rule Type选择Schedule expression:
技术分享图片
保存后就可以定时调用lambda了,O。

Integrate SQS and Lambda: serverless architecture for asynchronous workloads
Amazon Simple Queue Service Developer Guide
AWS Lambda Developer Guide
Programming Model for Authoring Lambda Functions in Java
AWS SDK for Java Developer Guide
Schedule Expressions Using Rate or Cron
AWS 视频中心
AWS微服务和无服务器架构入门
快速理解AWS Lambda,轻松构建Serverless后台
用无服务器应用模型构建AWS Lambda应用
如何通过运行无服务器来满足企业需求
无服务器架构设计模式和最佳实践

AWS学习笔记(七)--集成SQS和Lambda


推荐阅读
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了OC学习笔记中的@property和@synthesize,包括属性的定义和合成的使用方法。通过示例代码详细讲解了@property和@synthesize的作用和用法。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 基于layUI的图片上传前预览功能的2种实现方式
    本文介绍了基于layUI的图片上传前预览功能的两种实现方式:一种是使用blob+FileReader,另一种是使用layUI自带的参数。通过选择文件后点击文件名,在页面中间弹窗内预览图片。其中,layUI自带的参数实现了图片预览功能。该功能依赖于layUI的上传模块,并使用了blob和FileReader来读取本地文件并获取图像的base64编码。点击文件名时会执行See()函数。摘要长度为169字。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 动态规划算法的基本步骤及最长递增子序列问题详解
    本文详细介绍了动态规划算法的基本步骤,包括划分阶段、选择状态、决策和状态转移方程,并以最长递增子序列问题为例进行了详细解析。动态规划算法的有效性依赖于问题本身所具有的最优子结构性质和子问题重叠性质。通过将子问题的解保存在一个表中,在以后尽可能多地利用这些子问题的解,从而提高算法的效率。 ... [详细]
  • 高质量SQL书写的30条建议
    本文提供了30条关于优化SQL的建议,包括避免使用select *,使用具体字段,以及使用limit 1等。这些建议是基于实际开发经验总结出来的,旨在帮助读者优化SQL查询。 ... [详细]
author-avatar
手机用户2502906317
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有