aws-java-sdk-sqs
com.amazonaws
com.amazonaws
aws-lambda-java-core
1.2.0
com.amazonaws
aws-lambda-java-events
2.0.2
com.amazonaws
aws-lambda-java-log4j2
1.1.0
com.amazonaws
aws-java-sdk-bom
1.11.272
pom
import
SQS
中国区目前仅支持标准Queue,不支持FIFO Queue,下面代码以标准Queue为例,演示了创建Queue、配置Dead Letter Queue、发送Message、接收Message、删除Message、删除Queue的方法:
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import java.util.*;
public class SqsUtil {
private static final String ARN_ATTRIBUTE_NAME = "QueueArn";
private static AmazonSQS sqs;
static {
sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.CN_NORTH_1).build();
}
private SqsUtil() {
}
public static String createQueue(String queueName) {
System.out.println("Creating a new SQS queue called " + queueName);
CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName);
Map attributes = new HashMap<>();
// 接收消息等待时间
attributes.put("ReceiveMessageWaitTimeSeconds", "5");
createQueueRequest.withAttributes(attributes);
return sqs.createQueue(createQueueRequest).getQueueUrl();
}
public static String createDeadLetterQueue(String queueName) {
String queueUrl = createQueue(queueName);
// 配置Dead Letter Queue时使用ARN
return getQueueArn(queueUrl);
}
public static void configDeadLetterQueue(String queueUrl, String deadLetterQueueArn) {
System.out.println("Config dead letter queue for " + queueUrl);
SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
Map attributes = new HashMap<>();
// 最大接收次数设为5,当接收次数超过5后,消息未被处理和删除将被转到死信队列
attributes.put("RedrivePolicy", "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"" + deadLetterQueueArn + "\"}");
queueAttributes.setAttributes(attributes);
queueAttributes.setQueueUrl(queueUrl);
sqs.setQueueAttributes(queueAttributes);
}
public static void sendMessage(String queueUrl, String message) {
System.out.println("Sending a message to " + queueUrl);
SendMessageRequest request = new SendMessageRequest();
request.withQueueUrl(queueUrl);
request.withMessageBody(message);
Map messageAttributes = new HashMap<>();
// 添加消息属性,注意必须要有DataType和Value
messageAttributes.put("Hello", new MessageAttributeValue().withDataType("String").withStringValue("COCO"));
request.withMessageAttributes(messageAttributes);
sqs.sendMessage(request);
}
public static void receiveMessages(String queueUrl) {
System.out.println("Receiving messages from " + queueUrl);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(5);
receiveMessageRequest.withWaitTimeSeconds(10);
// 要添加MessageAttributeNames,否则不能接收
receiveMessageRequest.setMessageAttributeNames(Arrays.asList("Hello"));
List messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println("Message: " + message.getBody());
for (Map.Entry entry : message.getMessageAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue().getStringValue());
}
// Delete message
System.out.println("Deleting a message.");
String messageReceiptHandle = message.getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest(queueUrl, messageReceiptHandle));
}
}
public static void deleteQueue(String queueUrl) {
System.out.println("Deleting the queue " + queueUrl);
sqs.deleteQueue(new DeleteQueueRequest(queueUrl));
}
public static String getQueueArn(String queueUrl) {
List attributes = new ArrayList<>();
attributes.add(ARN_ATTRIBUTE_NAME);
GetQueueAttributesResult queueAttributes = sqs.getQueueAttributes(queueUrl, attributes);
return queueAttributes.getAttributes().get(ARN_ATTRIBUTE_NAME);
}
}
在运行上面代码前,要在{HOME}/.aws目录下配置credentials,用户要有SQS权限:
[default]
aws_access_key_id = AAAAAAAAAAAAAA
aws_secret_access_key = MXXXXXXXXXXXXXXXXXXXXXX9
测试一下:
// 创建Dead Letter Queue
String deadLetterQueueArn = createDeadLetterQueue("DeadLetterQueue");
// 创建Task Queue
String queueUrl = createQueue("TaskQueue");
// 配置Dead Letter Queue
configDeadLetterQueue(queueUrl, deadLetterQueueArn);
// 发送Message
for (int i = 0; i <6; i++) {
sendMessage(queueUrl, "Hello COCO " + i);
}
// 接收Message
receiveMessages(queueUrl);
// 删除Queue
deleteQueue(queueUrl);
Lambda
Lambda函数定义支持两种方式 :
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.Context;
public class Hello implements RequestHandler {
// Request,Response为自定义的类型
public Response handleRequest(Request request, Context context) {
String greetingString = String.format("Hello %s %s.", request.firstName, request.lastName);
return new Response(greetingString);
}
}
outputType handler-name(inputType input, Context context) {
...
}
inputType 和 outputType 可为以下类型之一:
如不需要,可以省略处理程序方法签名中的 Context 对象。
先编写一个简单的测试用例接收SQS消息,输入参数input为Queue URL:
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
public class Hello implements RequestHandler {
@Override
public String handleRequest(String input, Context context) {
LambdaLogger logger = context.getLogger();
logger.log("received : " + input);
SqsUtil.receiveMessages(input);
return "success";
}
}
程序编写完了,如何放入到Lambda函数中呢?需要打成jar包,且须包含依赖包,pom中增加shade插件:
org.apache.maven.plugins
maven-shade-plugin
3.1.0
false
package
shade
下面通过Web Console创建Lambda Function
注意:role要有lambda、Cloudwatch Logs、SQS权限。
然后上传jar包,配置Handler
再调整一下内存配置和超时参数,保存。
配置测试参数,测试一下先:
执行成功输出:
下面修改一下代码,输入参数类型改为ScheduledEvent,将使用触发器CloudWatch Events调用。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.ScheduledEvent;
public class Hello implements RequestHandler {
@Override
public String handleRequest(ScheduledEvent input, Context context) {
LambdaLogger logger = context.getLogger();
logger.log("received : " + input.toString() + "\n");
SqsUtil.receiveMessages("https://sqs.cn-north-1.amazonaws.com.cn/891245999999/TaskQueue");
return "success";
}
}
上传后,同样先手工测试一下,这次选择模板Scheduled Event
测试成功后,配置CloudWatch Events触发器,Rule Type选择Schedule expression:
保存后就可以定时调用lambda了,O。
AWS学习笔记(七)--集成SQS和Lambda