第一步:在终端启动一个消费都等待生产者生产出来的数据
代码实现
创建Maven项目
<dependency><groupId>junitgroupId><artifactId>junitartifactId><version>4.12version>dependency><dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>2.4.1version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-apiartifactId><version>1.7.25version>dependency><dependency><groupId>org.slf4jgroupId><artifactId>slf4j-log4j12artifactId><version>1.7.25version>dependency><dependency><groupId>org.apache.kafkagroupId><artifactId>kafka_2.12artifactId><version>2.4.1version>dependency>
- 在resources目录下添加log4j.properties
### 设置###
log4j.rootLogger &#61; debug,stdout,D,E### 输出信息到控制抬 ###
log4j.appender.stdout &#61; org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target &#61; System.out
log4j.appender.stdout.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern &#61; [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n### 输出DEBUG 级别以上的日志到&#61;E://logs/error.log ###
log4j.appender.D &#61; org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File &#61; E://logs/log.log
log4j.appender.D.Append &#61; true
log4j.appender.D.Threshold &#61; DEBUG
log4j.appender.D.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern &#61; %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n### 输出ERROR 级别以上的日志到&#61;E://logs/error.log ###
log4j.appender.E &#61; org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File &#61;E://logs/error.log
log4j.appender.E.Append &#61; true
log4j.appender.E.Threshold &#61; ERROR
log4j.appender.E.layout &#61; org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern &#61; %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
情况一&#xff1a;创建生产者
public class CustomerProducer {public static void main(String[] args) {Properties props &#61; new Properties();props.put("bootstrap.servers", "hcmaster:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer &#61; new KafkaProducer<>(props);for (int i &#61; 0; i < 8; i&#43;&#43;) {ProducerRecord<String, String> data &#61; new ProducerRecord<>("first", Integer.toString(i), "haha-" &#43; i);producer.send(data);}producer.close();}
}
运行程序&#xff0c;在Consumer终端上查看结果&#xff1a;
情况二&#xff1a;创建带回调的生产者
public class CallBackProducer {public static void main(String[] args) throws InterruptedException {Properties props &#61; new Properties();props.put("bootstrap.servers", "hcmaster:9092,hcslave1:9092,hcslave2:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> kafkaProducer &#61; new KafkaProducer<>(props);for (int i &#61; 0; i < 8; i&#43;&#43;) {Thread.sleep(500);ProducerRecord<String, String> pr &#61; new ProducerRecord<>("first", Integer.toString(i), "hehe-" &#43; i);kafkaProducer.send(pr, new Callback() {&#64;Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if(exception !&#61; null){System.out.println("发送失败");}else {System.out.print("发送成功&#xff1a; ");if (metadata !&#61; null) {System.out.println(metadata.topic()&#43;" - "&#43;metadata.partition() &#43; " - " &#43; metadata.offset());}}}});}kafkaProducer.close();}}
在Intellij控制中结果&#xff1a;
在counsumer终端中查看结果&#xff1a;
情况三&#xff1a;创建自定义分区的生产者
public class CustomPartitioner implements Partitioner {&#64;Overridepublic void configure(Map<String, ?> configs) {}&#64;Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 1;}&#64;Overridepublic void close() {}
}
public class CallBackProducer {public static void main(String[] args) throws InterruptedException {Properties props &#61; new Properties();props.put("bootstrap.servers", "hcmaster:9092,hcslave1:9092,hcslave2:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("partitioner.class", "com.hc.producer.customerparitioner.CustomPartitioner");KafkaProducer<String, String> kafkaProducer &#61; new KafkaProducer<>(props);for (int i &#61; 0; i < 8; i&#43;&#43;) {Thread.sleep(500);ProducerRecord<String, String> pr &#61; new ProducerRecord<>("first", Integer.toString(i), "hello world-" &#43; i);kafkaProducer.send(pr, (metadata, exception) -> {if (exception !&#61; null) {System.out.println("发送失败");} else {System.out.print("发送成功&#xff1a; ");if (metadata !&#61; null) {System.out.println(metadata.partition() &#43; " --- " &#43; metadata.offset());}}});}kafkaProducer.close();}}
在Intellij控制中结果&#xff1a;
在counsumer终端中查看结果&#xff1a;