作者:翔未央图_971 | 来源:互联网 | 2023-09-14 16:44
kafka 2.6.0(安装步骤查看这里)
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.0</version>
</dependency>
配置KafkaAdminClient
spring.kafka.bootstrap-servers=192.168.25.132:9092
定义Bean:
@Configuration
public class KafkaConf {
@Value("${spring.kafka.bootstrap-servers}")
private String server;
@Bean
public KafkaAdminClient kafkaAdminClient(){
Properties props = new Properties();
props.put("bootstrap.servers", server);
return (KafkaAdminClient) KafkaAdminClient.create(props);
}
}
创建Topic:
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaAdminClient kafkaAdminClient;
@GetMapping("/createTopic")
public CreateTopicsResult createTopic(){
NewTopic newTopic = new NewTopic("spring-topic", 3, (short) 1);
CreateTopicsResult result = kafkaAdminClient.createTopics(Arrays.asList(newTopic));
return result;
}
}
执行后,通过kafka-manager查看,名称为spring-topic的主题已经创建成功:
配置producer
spring.kafka.producer.retries=5
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.acks=all
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
定义发送消息接口:
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/send")
public String send(@RequestParam String message){
kafkaTemplate.send("spring-topic", message);
return "success";
}
}
配置consumer
spring.kafka.consumer.group-id=group-1
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
定义消息接收监听器:
@Component
public class ConsumerListener {
@KafkaListener(topics = "test")
public void onListener(String message){
System.out.println(String.format("接收到消息:%s", message));
}
}
测试
调用发送消息接口:http://localhost:99/kafka/send?message=abc,然后查看IDE窗口,consumer输出了接收到的消息