消息中间件 | 建议 |
---|---|
Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 |
RocketMQ | 可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 |
RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ |
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apache.org/
名词解释
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \--net=host wurstmeister/kafka:2.12-2.3.1
#--net=host,直接使用容器宿主机的网络命名空间, 即没有独立的网络环境。它使用宿主机的ip和端口
下载:http://www.kafkatool.com/download.html
创建kafka-demo项目,导入依赖
编写代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties &#61; new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//发送失败&#xff0c;失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.生产者对象KafkaProducer<String,String> producer &#61; new KafkaProducer<String, String>(properties);//封装发送的消息ProducerRecord<String,String> record &#61; new ProducerRecord<String, String>("test-topic","100001","hello kafka");//3.发送消息producer.send(record);//4.关闭消息通道&#xff0c;必须关闭&#xff0c;否则消息发送不成功producer.close();}}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties &#61; new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2.消费者对象KafkaConsumer<String, String> consumer &#61; new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("test-topic"));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecords<String, String> consumerRecords &#61; consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}
//设置相同的消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
//设置相同的消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
第二个消费者设置 group2&#xff1a;
//设置相同的消费者组prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
Kafka 中的分区机制指的是将每个主题划分成多个分区&#xff08;Partition&#xff09;
优势&#xff1a;可以处理更多的消息&#xff0c;不受单台服务器的限制&#xff0c;可以不受限的处理更多的数据
一个 topic 可以包含多个 分区partition&#xff0c;topic 消息保存在各个 partition 上&#xff1b;由于一个 topic 能被分到多个分区上&#xff0c;给 kafka 提供给了并行的处理能力&#xff0c;这也正是 kafka 高吞吐的原因之一。
每一个分区都是一个顺序的、不可变的消息队列&#xff0c; 并且可以持续的添加。分区中的消息都被分了一个序列号&#xff0c;称
之为偏移量(offset)&#xff1a;消息在日志中的位置&#xff0c;可以理解是消息在 partition 上的偏移量&#xff0c;也是代表消息的唯一序号。
默认的分区数量&#xff0c;可以在config/server.properties中num.partitions&#61;n配置
默认采用轮询策略&#xff0c;如果发送消息时指定了key则采用按键保存策略
Kafka 中消息的备份又叫做副本&#xff08;Replica&#xff09;&#xff0c;其中副本又分为两种类型&#xff1a;
在 Kafka 中的 Partition 有一个 leader 与多个 follower&#xff0c;producer 往某个 Partition 中写入数据时&#xff0c;只会往 leader 中写入数据&#xff0c;然后数据会被复制进其他的 follower中。而每一个 follower 可以理解成一个消费者&#xff0c;定期去 leader 拉取消息。而只有数据同步了后&#xff0c;kafka 才会给生产者返回一个 ACK 告知消息已经存储落地了。
kafka不是完全同步&#xff0c;也不是完全异步&#xff0c;是一种特殊的ISR&#xff08;In Sync Replica&#xff09;&#xff0c;为了保证性能&#xff0c;Kafka 不会采用强一致性的方式来同步主从的数据。
如果leader宕机后&#xff0c;需要选出新的leader&#xff0c;选举的原则如下&#xff1a;
第一&#xff1a;选举时优先从ISR中选定&#xff0c;因为这个列表中follower的数据是与leader同步的
第二&#xff1a;如果ISR列表中的follower都不行了&#xff0c;就只能从其他follower中选取
极端情况&#xff0c;就是所有副本都失效了&#xff0c;这时有两种方案第一&#xff1a;等待ISR中的一个活过来&#xff0c;选为Leader&#xff0c;数据可靠&#xff0c;但活过来的时间不确定第二&#xff1a;选择第一个活过来的Replication&#xff0c;不一定是ISR中的&#xff0c;选为leader&#xff0c;以最快速度恢复可用性&#xff0c;但数据不一定完整
发送类型
RecordMetadata recordMetadata &#61; producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());
//异步消息发送producer.send(record, new Callback() {&#64;Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e !&#61; null){System.out.println("记录异常信息到日志表中");}System.out.println(recordMetadata.offset());}});
6.2)参数详解-08:30
//ack配置 消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
参数的选择说明
确认机制 | 说明 |
---|---|
acks&#61;0 | 生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险&#xff0c;但是速度最快 |
acks&#61;1&#xff08;默认值&#xff09; | 只要集群首领节点收到消息&#xff0c;生产者就会收到一个来自服务器的成功响应 |
acks&#61;all | 只有当所有参与赋值的节点全部收到消息时&#xff0c;生产者才会收到一个来自服务器的成功响应 |
代码中配置方式&#xff1a;
//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
消息压缩
默认情况下&#xff0c; 消息发送时不会被压缩。
代码中配置方式&#xff1a;
//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
压缩算法 | 说明 |
---|---|
snappy | 占用较少的 CPU&#xff0c; 却能提供较好的性能和相当可观的压缩比&#xff0c; 如果看重性能和网络带宽&#xff0c;建议采用 |
lz4 | 占用较少的 CPU&#xff0c; 压缩和解压缩速度较快&#xff0c;压缩比也很客观 |
gzip | 占用较多的 CPU&#xff0c;但会提供更高的压缩比&#xff0c;网络带宽有限&#xff0c;可以使用这种算法 |
使用压缩可以降低网络传输开销和存储开销&#xff0c;而这往往是向 Kafka 发送消息的瓶颈所在。
7.2)消息有序性
应用场景&#xff1a;
仅仅是保证Topic的一个分区顺序处理&#xff0c;不能保证跨分区的消息先后处理顺序。
所以 Kafka 要保证消息的消费顺序&#xff0c;可以有2种方法&#xff1a;
一、1个Topic&#xff08;主题&#xff09;只创建1个Partition(分区)&#xff0c;这样生产者的所有数据都发送到了一个Partition(分区)&#xff0c;保证了消息的消费顺序。
二、生产者在发送消息的时候指定要发送到哪个Partition(分区)。
1&#xff09;指明 partition 的情况下&#xff0c;直接将指明的值直接作为 partiton 值&#xff1b;
2&#xff09;指定key&#xff1a;具有同1个 key 的所有消息&#xff0c;会发往同1个 partition。也是有序的。
ProducerRecord
kafka不会像其他JMS队列那样需要得到消费者的确认。
不过消费者可以使用kafka来追踪消息在分区的位置&#xff08;偏移量&#xff09;&#xff0c;消费者会往一个叫做_consumer_offset的特殊主题发送消息&#xff0c;消息里包含了每个分区的偏移量。把当前消费的位置存储起来&#xff08;持久化&#xff09;的动作称为 “提交” &#xff0c;消费者在消费完消息之后需要执行消费偏移量&#xff08;offset&#xff09;的提交。
因此消费者提交消息的偏移量就变得尤其重要&#xff0c;Kafka提交偏移量的方式有两种&#xff1a;
- 自动提交偏移量&#xff08;默认方式&#xff09;
当enable.auto.commit被设置为true&#xff0c;提交方式就是让消费者自动提交偏移量&#xff0c;每隔5秒消费者会自动把从
poll()方法接收的最大偏移量提交上去
注意&#xff1a;Kafka 自动提交消费位移的方式非常简便&#xff0c;它免去了复杂的位移提交逻辑&#xff0c;但并没有为开发者留有余地来处理重复消费和消息丢失的问题。自动位移提交无法做到精确的位移管理
1.同步提交commitSync()
把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量&#xff0c;commitSync()将会提交poll返回的最新的偏移量&#xff0c;所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
只要没有发生不可恢复的错误&#xff0c;commitSync()方法会一直尝试直至提交成功&#xff0c;如果提交失败也可以记录错误日志
while (true){ConsumerRecords<String, String> records &#61; consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println("记录提交失败的异常&#xff1a;"&#43;e);}}
}
上述提交有一个缺点&#xff0c;那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率&#xff0c;但这个会增加消息重复的概率&#xff08;和自动提交一样&#xff09;。另外一个解决办法是&#xff0c;使用异步提交的API。
2.异步提交commitAsync()
异步提交的方式在执行的时候消费者线程不会被阻塞&#xff0c;可以在提交消费位移的结果还未返回之前就开始新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。
while (true){ConsumerRecords<String, String> records &#61; consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {&#64;Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!&#61;null){System.out.println("记录错误的提交偏移量&#xff1a;"&#43; map&#43;",异常信息"&#43;e);}}});}
异步提交也有个缺点&#xff0c;那就是如果服务器返回提交失败&#xff0c;异步提交不会进行重试。相比较起来&#xff0c;同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为&#xff0c;如果同时存在多个异步提交&#xff0c;进行重试可能会导致位移覆盖。
举个例子&#xff0c;假如我们发起了一个异步提交commitA&#xff0c;此时的提交位移为2000&#xff0c;随后又发起了一个异步提交commitB且位移为3000&#xff1b;commitA提交失败但commitB提交成功&#xff0c;此时commitA进行重试并成功的话&#xff0c;会将实际上将已经提交的位移从3000回滚到2000&#xff0c;导致消息重复消费。
3.同步和异步组合提交
因此&#xff0c;在消费者关闭前一般会组合使用 commitAsync() 和 commitSync() 。使用 commitAsync() 方式来做每条消费信息的提交&#xff08;因为该种方式速度更快&#xff09;&#xff0c;最后再使用 commitSync() 方式来做位移提交最后的保证。
try {while (true){ConsumerRecords<String, String> records &#61; consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();}
}catch (Exception e){e.printStackTrace();System.out.println("记录错误信息&#xff1a;"&#43;e);
}finally {try {consumer.commitSync();}finally {consumer.close();}
}
1.导入spring-kafka依赖信息
2.在resources下创建文件application.yml
server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;&#64;RestControllerpublic class HelloController {&#64;Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;&#64;GetMapping("/hello")public String hello(){kafkaTemplate.send("test-topic","程序员");return "ok";}}
4.消息消费者
package com.test.kafka.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;&#64;Component
public class HelloListener {&#64;KafkaListener(topics &#61; "test-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){System.out.println(message);}}
}
8.2)传递消息为对象
目前springboot整合后的kafka&#xff0c;因为序列化器是StringSerializer&#xff0c;这个时候如果需要传递对象可以有两种方式
方式一&#xff1a;可以自定义序列化器&#xff0c;对象类型众多&#xff0c;这种方式通用性不强&#xff0c;本章节不介绍
方式二&#xff08;常用&#xff09;&#xff1a;可以把要传递的对象进行转json字符串&#xff0c;接收消息后再转为对象即可
&#64;GetMapping("/hello")
public String hello(){User user &#61; new User();user.setUsername("xiaowang");user.setAge(18);kafkaTemplate.send("user-topic", JSON.toJSONString(user));return "ok";
}
package com.test.kafka.listener;import com.alibaba.fastjson.JSON;import com.test.kafka.pojo.User;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import org.springframework.util.StringUtils;&#64;Componentpublic class HelloListener {&#64;KafkaListener(topics &#61; "user-topic")public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user &#61; JSON.parseObject(message, User.class);System.out.println(user);}}}