作者:横刀2502934567 | 来源:互联网 | 2024-09-30 22:13
一、生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleKafkaProducer {
private static KafkaProducer producer;
private final static String TOPIC = "adienTest2";
public SimpleKafkaProducer(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(props);
}
public void produce(){
for (int i = 30;i<40;i++){
String key = String.valueOf(i);
String data = "hello kafka message:"+key;
producer.send(new ProducerRecord(TOPIC,key,data));
System.out.println(data);
}
producer.close();
}
public static void main(String[] args) {
new SimpleKafkaProducer().produce();
}
}
二、消费者
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.log4j.Logger
import java.util.Arrays
import java.util.Properties
public class SimpleKafkaConsumer {
private static KafkaConsumer consumer
private final static String TOPIC = "adienTest2"
public SimpleKafkaConsumer(){
Properties props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
//每个消费者分配独立的组号
props.put("group.id", "test2")
//如果value合法,则自动提交偏移量
props.put("enable.auto.commit", "true")
//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000")
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", "30000")
//自动重置offset
props.put("auto.offset.reset","earliest")
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
cOnsumer= new KafkaConsumer(props)
}
public void consume(){
consumer.subscribe(Arrays.asList(TOPIC))
while (true) {
ConsumerRecords records = consumer.poll(100)
for (ConsumerRecord record : records){
System.out.printf("offset = %d, key = %s, value = %s",record.offset(), record.key(), record.value())
System.out.println()
}
}
}
public static void main(String[] args) {
new SimpleKafkaConsumer().consume()
}
}
三、运行
1、该生产者、消费者是运行本地搭建的kafka,所以先用命令行启动zookeeper,再启动kafka服务,还要创建本例中使用的topic:adienTest2
2、运行生产者代码,运行结果如下:
3、运行消费者代码,运行结果如下: