作者:横刀2502934567 | 来源:互联网 | 2024-09-30 22:13
一、生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleKafkaProducer {
private static KafkaProducer producer;
private final static String TOPIC = "adienTest2" ;
public SimpleKafkaProducer (){
Properties props = new Properties();
props.put("bootstrap.servers" , "localhost:9092" );
props.put("acks" , "all" );
props.put("retries" , 0 );
props.put("batch.size" , 16384 );
props.put("linger.ms" , 1 );
props.put("buffer.memory" , 33554432 );
props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" );
producer = new KafkaProducer(props);
}
public void produce (){
for (int i = 30 ;i<40 ;i++){
String key = String.valueOf(i);
String data = "hello kafka message:" +key;
producer.send(new ProducerRecord(TOPIC,key,data));
System.out.println(data);
}
producer.close();
}
public static void main (String[] args) {
new SimpleKafkaProducer().produce();
}
}
二、消费者
import org.apache .kafka .clients .consumer .ConsumerRecord
import org.apache .kafka .clients .consumer .ConsumerRecords
import org.apache .kafka .clients .consumer .KafkaConsumer
import org.apache .log 4j.Logger
import java.util .Arrays
import java.util .Properties
public class SimpleKafkaConsumer {
private static KafkaConsumer consumer
private final static String TOPIC = "adienTest2"
public SimpleKafkaConsumer(){
Properties props = new Properties()
props.put ("bootstrap.servers" , "localhost:9092" )
//每个消费者分配独立的组号
props.put ("group.id" , "test2" )
//如果value合法,则自动提交偏移量
props.put ("enable.auto.commit" , "true" )
//设置多久一次更新被消费消息的偏移量
props.put ("auto.commit.interval.ms" , "1000" )
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put ("session.timeout.ms" , "30000" )
//自动重置offset
props.put ("auto.offset.reset" ,"earliest" )
props.put ("key.deserializer" ,
"org.apache.kafka.common.serialization.StringDeserializer" )
props.put ("value.deserializer" ,
"org.apache.kafka.common.serialization.StringDeserializer" )
cOnsumer= new KafkaConsumer(props)
}
public void consume(){
consumer.subscribe (Arrays.asList (TOPIC))
while (true) {
ConsumerRecords records = consumer.poll (100 )
for (ConsumerRecord record : records){
System.out .printf ("offset = %d, key = %s, value = %s" ,record.offset (), record.key (), record.value ())
System.out .println ()
}
}
}
public static void main(String[] args) {
new SimpleKafkaConsumer().consume ()
}
}
三、运行
1、该生产者、消费者是运行本地搭建的kafka,所以先用命令行启动zookeeper,再启动kafka服务,还要创建本例中使用的topic:adienTest2
2、运行生产者代码,运行结果如下:
3、运行消费者代码,运行结果如下: