作者:树缝中间_324 | 来源:互联网 | 2023-09-03 19:27
上节说了kafka在linux环境下怎么搭建集群。这节写一下怎么在springcloud中以stream流方式去做kafka集群对接。1.yml配置#springCloudkafk
上节说了kafka在linux环境下怎么搭建集群。这节写一下怎么在springcloud中以stream流方式去做kafka集群对接。
1.yml配置
#spring Cloud kafka -- streams --
cloud:
stream:
kafka:
binder:
minPartitionCount: 3 # 分区数量,主要就是为了减轻单台服务器的压力,扩大并发量
brokers: 192.168.100.100:9092,192.168.100.101:9092,192.168.100.102:9092 # kafka服务地址和端口
autoCreateTopics: true
autoAddPartitions: true
2.消息发送
@RestController
@RequestMapping("/kafka")
@EnableBinding(value = {WarningStreams.class})
public class kafkaTest {
@Autowired
private MessageService messageService;
/**
* 测试消息发送,入参就是你的topic,进行发送的时候就算kafka中没有该topic,他也会自动创建一个你传入的topic
* 这里面的Msg是我封装的一个消息对象,可以是随意的一个消息对象,字符串也可以
* @param topic
*/
@RequestMapping("/sendMsg")
public void sendMsg(String topic){
// 循环发送6次消息,分别发送在不同的分区
for (int i=0; i<=5; i++ ) {
Msg msg = new Msg();
msg.setData(null);
msg.setTaskId("1");
msg.setMsg("测试消息发送");
msg.setMsgId(System.currentTimeMillis() + MathUtil.getFiveRandom());
msg.setSuccess("true");
msg.setCode("200");
msg.setMsgType(100);
String result = messageService.sendControl(msg, topic);
System.out.println(result);
}
}
}
messageService类:
@Service
public class MessageService {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private BinderAwareChannelResolver resolver;
/**
* 发送预警消息到指定topic,这里的topic是由平台编码+平台名称组成
* 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
* @param warnings
* @param topic
* @return
*/
public String sendWarning(final Msg warnings, String topic) {
logger.info("Sending warnings {}", warnings);
// 获取预警的topic,然后发送预警消息到kafka的topic
MessageChannel messageChannel = resolver.resolveDestination(topic);
messageChannel.send(MessageBuilder
.withPayload(warnings)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
return "send msg ok";
}
/**
* 发送布控消息到指定topic,这里的topic是由平台编码+平台名称组成
* 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
* @param msg
* @param topic
* @return
*/
public String sendControl(final Msg msg, String topic) {
logger.info("Sending controlMsg {}", JSON.toJSONString(msg));
// 获取布控的topic,然后发送布控消息到kafka的topic
MessageChannel messageChannel = resolver.resolveDestination(topic);
messageChannel.send(MessageBuilder
.withPayload(msg)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
return "send msg ok";
}
}
发送完毕后会在服务器中的topic下看到你传入的那个topic,并且会有三个分区,每个分区分别对应三台服务器并且每台服务器中会有两条消息,如下图:
3.消息接收
@RestController
@RequestMapping("/kafka")
@EnableBinding(value = {WarningStreams.class})
public class kafkaTest {
/**
* 测试消息接收,接收对象用Object,否则收不到
* @param playLoad
*/
@StreamListener(WarningStreams.INPUT)
public void receive(Object playLoad) {
System.out.println("消息消费..result=="+ JSON.toJSONString(playLoad));
}
}
当消息被消费后,分区中的数据释放被清空,但是会保存在硬盘的log日志中。也就是在server.properties中你配置的log目录