yum -y list java*
yum install java-1.8.0-openjdk.x86_64
[root@liang kafka_2.12-2.4.0]# java -version
openjdk version "1.8.0_232"
OpenJDK Runtime Environment (build 1.8.0_232-b09)
OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
通过yum安装的默认路径为:/usr/lib/jvm
发现在启动的时候说使用 --zookeeper是一个过时的方法,此时,才知道原来在最新的版本中,这种启动方式已经被删除了,
最后附上0.90版本之后启动消费者的方法: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
1019 wget http://mirror.bit.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
1021 tar -zxvf kafka_2.12-2.4.0.tgz
1023 cd kafka_2.12-2.4.0
1039 mv kafka_2.12-2.4.0 /alidata/server/
1040 cd /alidata/server/
1042 cd kafka_2.12-2.4.0/
1059 ./bin/zookeeper-server-start.sh ./config/zookeeper.properties 1>/dev/null 2>&1 &
1060 cd config
1063 cp server.properties server.properties.bak
1064 vim server.properties
设置 listener 127.0.0.1:9092
listeners=PLAINTEXT://127.0.0.1:9092
1075 ./bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
1076 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic ipa-order-success
[root@liang kafka_2.12-2.4.0]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
test
[root@liang kafka_2.12-2.4.0]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>xiao ming
>xiaoli
[root@liang kafka_2.12-2.4.0]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
xiao ming
xiaoli
[root@liang kafka_2.12-2.4.0]# cp config/server.properties config/server-1.properties
[root@liang kafka_2.12-2.4.0]# cp config/server.properties config/server-2.properties
Now edit these new files and set the following properties:
1 2 3 4 5 6 7 8 9 | config/server-1.properties: broker.id=1 listeners=PLAINTEXT://127.0.0.1:9093 log.dirs=/tmp/kafka-logs-1
config/server-2.properties: broker.id=2 listeners=PLAINTEXT://127.0.0.1:9094 log.dirs=/tmp/kafka-logs-2 |
[root@liang kafka_2.12-2.4.0]# bin/kafka-server-start.sh config/server-1.properties 1>/dev/null 2>&1 &
[root@liang kafka_2.12-2.4.0]# bin/kafka-server-start.sh config/server-2.properties 1>/dev/null 2>&1 &
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看 哪个 broker 正在做什么?
[root@liang kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
解释:
由于这个 topic 只有 一个 分区,所以这 只有一行记录。
replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
“isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader
[root@liang kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824,flush.messages=1,max.message.bytes=12800000
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
这并不奇怪,刚才创建的主题没有Replicas,并且在服务器“0”上,我们创建它的时候,集群中只有一个服务器,所以是“0”。
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
下面 测试 集群 容错能力。broker 0 是 leader; 现在 杀死它。
[root@liang kafka_2.12-2.4.0]# netstat -unltp | grep java
tcp 0 0 :::57921 :::* LISTEN 33112/java
tcp 0 0 ::ffff:127.0.0.1:9092 :::* LISTEN 34404/java
tcp 0 0 ::ffff:127.0.0.1:9093 :::* LISTEN 44287/java
tcp 0 0 :::2181 :::* LISTEN 33112/java
tcp 0 0 ::ffff:127.0.0.1:9094 :::* LISTEN 44714/java
tcp 0 0 :::56006 :::* LISTEN 34404/java
tcp 0 0 :::37321 :::* LISTEN 44287/java
tcp 0 0 :::46123 :::* LISTEN 44714/java
[root@liang kafka_2.12-2.4.0]# kill 34404
[root@liang kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9093 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
即便是 leader 已经更换过,消息仍然可以被消费。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic //报错9092不存在了
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic my-replicated-topic
^CProcessed a total of 0 messages
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --from-beginning --topic my-replicated-topic
^CProcessed a total of 0 messages
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-producer.sh --broker-list localhost:9093 --topic my-replicated-topic
>cluster message 1
>cluster message 2
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --from-beginning --topic my-replicated-topic
^CProcessed a total of 0 messages
You have mail in /var/spool/mail/root
[root@liang kafka_2.12-2.4.0]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
[5] 55025
You have mail in /var/spool/mail/root
[root@liang kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
Topic: my-replicated-topic PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2,0
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
cluster message 1
cluster message 2
cluster message 3
cluster message 4
[root@liang kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
cluster message 1
cluster message 2
cluster message 3
cluster message 4
使用 kafka connect 去 导入导出数据。
It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system
> echo -e "foo\nbar" > test.txt
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties //报错;
http://kafka.apache.org/documentation/