版本:kafka_2.13-2.8.0
1、查看当前服务器中的所有topic
bin/kafka-topics.sh
--list --bootstrap-server hadoop201:9092
2、创建topic
bin/kafka-topics.sh
--create --bootstrap-server hadoop201:9092 --topic firstTopic
也可以在创建 topic 时,手动方式指定分区数、副本数
bin/kafka-topics.sh
--create --bootstrap-server hadoop201:9092 --replication-factor 2 --partitions 3 --topic firstTopic
备注:
replication-factor 2 备份数(2个备份)
partitions 3 分区数(3个分区)
topic firstTopic firstTopic为topic的名称
3、显示topic详细信息
bin/kafka-topics.sh
--describe --topic firstTopic --bootstrap-server hadoop201:9092
[hadoop@hadoop201 kafka]$ bin/kafka-topics.sh --describe --topic firstTopic --bootstrap-server hadoop201:9092
Topic: firstTopic TopicId: n0-rqoLVQkqsriiCElr4mw PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: firstTopic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
4、删除topic
bin/kafka-topics.sh
--delete --bootstrap-server hadoop201:9020 --topic firstTopic
- 老版本:需要server.properties中设置
delete.topic.enable=true
,否则只是标记删除或者直接重启。 - 新版本:已经没有
delete.topic.enable=true
这个配置,可以直接删除 topic 了。如果有生产者或者消费者在使用该topic,需要停掉生产者或者消费者才能进行删除)
5、命令行发送消息
bin/kafka-console-producer.sh
--topic firstTopic --bootstrap-server hadoop201:9092
备注:
kafka01:主机名(通过vi /etc/sysconfig/network,修改hostname即可,此处可填IP地址)
orderMq:为topic的名称
6、命令行消费消息
bin/kafka-console-consumer.sh
--topic firstTopic --from-beginning --bootstrap-server hadoop201:9092
备注:
--from-beginning:会把主题中以往所有的数据都读取出来。即:从头开始读
7、查看消费位置
1.使用 --all-groups查看所有组
bin/kafka-consumer-groups.sh
--bootstrap-server hadoop201:9092 --describe --all-groups
2.使用 --group 查看指定组
bin/kafka-consumer-groups.sh
--bootstrap-server hadoop201:9092 --describe --group 组名
8、修改topic的分区数
bin/kafka-topics.sh
--bootstrap-server hadoop201:9092 --alter --topic firstTopic --partitions 6