此处讲解单机安装kafka
kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目。在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ。Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)
kafka的官方网站在哪里?
http://kafka.apache.org/
在哪里下载?需要哪些组件的支持?
kafka2.9.2在下面的地址可以下载:
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
首先下载jdk7 jdk8可能有点问题
http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html
sudo find / -type d -name jre # 查找java路径 重要 export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_80.jdk/Contents/Home/ export CLASSPATH=$JAVA_HOME/jie/lib:$JAVA_HOME/lib export PATH=$PATH:$JAVA_HOME/BIN
加载环境变量使其生效
source /etc/profile
kafka是通过zookeeper来管理集群。
kafka软件包内虽然包括了一个简版的zookeeper,但是感觉功能有限。在生产环境下,建议还是直接下载官方zookeeper软件。
一.zookeeper下载与安装
1)下载
wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
2)解压
tar zxvf zookeeper-3.4.6.tar.gz
3)配置
cd zookeeper-3.4.6
cp -rf conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg
zoo.cfg:
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/Users/apple/Documents/soft/zookeeper_soft/zkdata #这个目录是预先创建的 # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1
tickTime=2000 #心跳时间,单位:毫秒 initLimit=10 #Follower在启动时需要在10个心跳时间内从Leader同步数据 syncLimit=5 #超过5个心跳时间收不到Follower的响应,就认为此Follower已经下线 dataDir=/zyxx_data/zookeeper/data00 #zookeeper存储数据的目录 clientPort=2181 #zookeeper服务端口 server.0=192.168.6.56:20881:30881 server.1=192.168.6.56:20882:30882 server.2=192.168.6.56:20883:30883
server.0、server.1、server.2 是指整个zookeeper集群内的节点列表。server的配置规则为:server.N=YYY:A:B N表示服务器编号 YYY表示服务器的IP地址 A为LF通信端口,表示该服务器与集群中的leader交换的信息的端口。 B为选举端口,表示选举新leader时服务器间相互通信的端口(当leader挂掉时,其余服务器会相互通信,选择出新的leader) 一般来说,集群中每个服务器的A端口都是一样,每个服务器的B端口也是一样。但是当所采用的为伪集群时,IP地址都一样,只能是A端口和B端口不一样。
4)启动zookeeper
adeMacBook-Pro:bin apple$ sh zkServer.sh start JMX enabled by default Using config: /Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../conf/zoo.cfg -n Starting zookeeper ... STARTED adeMacBook-Pro:bin apple$ ps ax| grep zookeeper.out 10311 s003 S+ 0:00.01 grep zookeeper.out adeMacBook-Pro:bin apple$ ps ax| grep zookeeper 10307 s003 S 0:00.63 /usr/bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../build/classes:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../build/lib/*.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../src/java/lib/*.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.Only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../conf/zoo.cfg
二 下载并且安装kafka(预先得安装配置好scala的环境,Mac环境参照:mac平台scala开发环境搭建)
1).下载kafka:
1).下载kafka:
wget http://apache.fayea.com/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
2) 解压:
tar -zxf kafka_2.10-0.8.2.1.tgz
3)启动kafka
adeMacBook-Pro:kafka_2.10-0.8.2.1 apple$ sh bin/kafka-server-start.sh config/server.properties
备注:要挂到后台使用:
sh bin/kafka-server-start.sh config/server.properties &
"-daemon" 参数代表以守护进程的方式启动kafka server。
sh bin/kafka-server-start.sh config/server.properties --daemon
官网及网上大多给的启动命令是没有"-daemon"参数,如:“bin/kafka-server-start.sh config/server.properties &”,但是这种方式启动后,如果用户退出的ssh连接,进程就有可能结束,具体不清楚为什么。
4)新建一个TOPIC
adeMacBook-Pro:bin apple$ sh kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181
备注:要挂到后台使用:
sh kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181 &
创建主题 kafka生产和消费数据,必须基于主题topic。主题其实就是对消息的分类。 创建主题:名称为“test”、复制数目为1、partitions为1的topic主题 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test replication-factor : 复制数目,提供failover机制;1代表只在一个broker上有数据记录,一般值都大于1,代表一份数据会自动同步到其他的多个broker,防止某个broker宕机后数据丢失。 partitions : 一个topic可以被切分成多个partitions,一个消费者可以消费多个partitions,但一个partitions只能被一个消费者消费,所以增加partitions可以增加消费者的吞吐量。kafka只保证一个partitions内的消息是有序的,多个一个partitions之间的数据是无序的。
查看已经创建的主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
启动生产者和消费者
生产者产生(输入)数据,消费者消费(输出)数据
5) 把KAFKA的生产者启动起来:
adeMacBook-Pro:bin apple$ sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic
备注:要挂到后台使用:
sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic &
6)另开一个终端,把消费者启动起来:
adeMacBook-Pro:bin apple$ sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning
备注:要挂到后台使用:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning &
7)使用
6.关闭kafka和zookeeper :
cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin sh kafka-server-stop.sh ../config/server.properties cd /Volumes/Untitled/application/zookeeper-3.4.6/bin sh zkServer.sh stop
心得总结:
1.produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口;
2.必须先创建topic才能使用;
3.topic本质是以文件的形式储存在zookeeper上的。
我的mac执行命令
启动zookeeper cd /Volumes/Untitled/application/zookeeper-3.4.6/bin sh zkServer.sh start 启动kafka cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1 sh bin/kafka-server-start.sh config/server.properties & 另一个终端创建主题 cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin sh kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181 & 另一个终端创建生产者 cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic # 测试的时候可以不用后台 另一个终端创建消费者 cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning 停止kafka zookeeper
或者 ps -ef |grep kafka-|grep -v grep |xargs kill -9 # 详见kil的信号处理 1 2 9 15 cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin sh kafka-server-stop.sh ../config/server.properties cd /Volumes/Untitled/application/zookeeper-3.4.6/bin sh zkServer.sh stop
最后附上0.90版本之后启动消费者的方法: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
安装kafka
python -m pip install kafka
重要:配置本机主机名在 /etc/hosts 文件
命令查看 hostname
127.0.0.1 对应的主机名
topic 用上面的
生产者:
#!/usr/bin/env python # _*_coding:utf-8_*_ from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError import json class Kafka_producer(): ''' 使用kafka的生产模块 ''' def __init__(self, kafkahost, kafkaport, kafkatopic): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort, )) def sendjsondata(self, params): try: parmas_message = json.dumps(params) producer = self.producer producer.send(self.kafkatopic, parmas_message.encode('utf-8')) producer.flush() except KafkaError as e: print e class Kafka_consumer(): ''' 使用Kafka—python的消费模块 ''' def __init__(self, kafkahost, kafkaport, kafkatopic, groupid): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.groupid = groupid self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid, bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort )) def consume_data(self): try: for message in self.consumer: # print json.loads(message.value) yield message except KeyboardInterrupt, e: print e def main(): ''' 测试consumer和producer :return: ''' ##测试生产模块 producer = Kafka_producer("localhost", 9092, "kafkatopic") for id in range(10): params = '{abetst}:{null}---1111' + str(id) producer.sendjsondata(params) ##测试消费模块 #消费模块的返回格式为ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None, #\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195, #\serialized_key_size=-1, serialized_value_size=21) # consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest') # consumer = Kafka_consumer('127.0.0.1', 9092, "kafkatopic", 'test-python-ranktest') # message = consumer.consume_data() # for i in message: # print i.value if __name__ == '__main__': main()
消费者 consumergroup 换一个 就能重新拿全部数据
#!/usr/bin/env python # -*- coding: utf-8 -*- import json from pykafka import KafkaClient # client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点 client = KafkaClient(hosts="127.0.0.1:9092") # 可接受多个Client这是重点 print client.topics # 所有topic topic = client.topics['kafkatopic'] # 生产者 # producer = topic.get_producer() # producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过 # 消费者 # balanced_consumer = topic.get_balanced_consumer( # consumer_group='testgroup', # auto_commit_enable=True, # 设置为Flase的时候不需要添加 consumer_group # zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk # ) balanced_consumer = topic.get_balanced_consumer( consumer_group="mykafka2", auto_commit_enable=True, zookeeper_connect="127.0.0.1:2181" ) # partition = balanced_consumer.partitions[0] # offset = partition.latest_available_offset() - 10 # balanced_consumer.reset_offsets(((partition, offset),)) # print balanced_consumer.commit_offsets() # print balanced_consumer for message in balanced_consumer: if message is None: continue try: msg = json.loads(message.value) print msg except Exception as e: print message.value