kafka生产者发送消息,消费者接受消息
导入依赖
<dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>2.1.0version>
dependency>
生产者发送消息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
public class ProducerQuickStart {public static void main(String[] args) {Properties properties &#61; new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");properties.put(ProducerConfig.RETRIES_CONFIG,5);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String,String> producer &#61; new KafkaProducer<String, String>(properties);ProducerRecord<String,String> record &#61; new ProducerRecord<String, String>("test-topic","100001","hello kafka");producer.send(record);producer.close();}}
消费者接收消息
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerQuickStart {public static void main(String[] args) {Properties properties &#61; new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer &#61; new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList("test-topic"));while (true) {ConsumerRecords<String, String> consumerRecords &#61; consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}