作者:翔未央图_971 | 来源:互联网 | 2023-09-14 16:44
kafka 2.6.0(安装步骤查看这里)
引入依赖
< dependency>
< groupId> org.springframework.boot< /groupId>
< artifactId> spring-boot-starter-web< /artifactId>
< /dependency>
< dependency>
< groupId> org.springframework.kafka< /groupId>
< artifactId> spring-kafka< /artifactId>
< version> 2.6.0< /version>
< /dependency>
配置KafkaAdminClient
spring.kafka.bootstrap-servers= 192.168.25.132:9092
定义Bean:
@Configuration
public class KafkaConf {
@Value ( "${spring.kafka.bootstrap-servers}" )
private String server;
@Bean
public KafkaAdminClient kafkaAdminClient ( ) {
Properties props = new Properties ( ) ;
props. put ( "bootstrap.servers" , server) ;
return ( KafkaAdminClient) KafkaAdminClient. create ( props) ;
}
}
创建Topic:
@RequestMapping ( "/kafka" )
public class KafkaController {
@Autowired
private KafkaAdminClient kafkaAdminClient;
@GetMapping ( "/createTopic" )
public CreateTopicsResult createTopic ( ) {
NewTopic newTopic = new NewTopic ( "spring-topic" , 3 , ( short ) 1 ) ;
CreateTopicsResult result = kafkaAdminClient. createTopics ( Arrays. asList ( newTopic) ) ;
return result;
}
}
执行后,通过kafka-manager查看,名称为spring-topic的主题已经创建成功:
配置producer
spring.kafka.producer.retries= 5
spring.kafka.producer.batch-size= 16384
spring.kafka.producer.buffer-memory= 33554432
spring.kafka.producer.acks= all
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer= org.apache.kafka.common.serialization.StringSerializer
定义发送消息接口:
@RequestMapping ( "/kafka" )
public class KafkaController {
@Autowired
private KafkaTemplate< String, Object> kafkaTemplate;
@GetMapping ( "/send" )
public String send ( @RequestParam String message) {
kafkaTemplate. send ( "spring-topic" , message) ;
return "success" ;
}
}
配置consumer
spring.kafka.consumer.group-id= group-1
spring.kafka.consumer.enable-auto-commit= true
spring.kafka.consumer.auto-commit-interval= 100
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer
定义消息接收监听器:
@Component
public class ConsumerListener {
@KafkaListener ( topics = "test" )
public void onListener ( String message) {
System. out. println ( String. format ( "接收到消息:%s" , message) ) ;
}
}
测试
调用发送消息接口:http://localhost:99/kafka/send?message=abc,然后查看IDE窗口,consumer输出了接收到的消息