热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

(单机安装kafka)mac安装jdkzookeeperkafkapythonkafka模块

此处讲解单机安装kafka kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目。在它的主页描述kafka为一个高吞吐量的分布式(能将消息分

此处讲解单机安装kafka  

  kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目。在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ。Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)

kafka的官方网站在哪里?
  http://kafka.apache.org/

在哪里下载?需要哪些组件的支持?
  kafka2.9.2在下面的地址可以下载:
  https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

 

 

首先下载jdk7  jdk8可能有点问题

http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html

sudo find / -type d -name jre  # 查找java路径  重要

export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_80.jdk/Contents/Home/
export CLASSPATH=$JAVA_HOME/jie/lib:$JAVA_HOME/lib
export PATH=$PATH:$JAVA_HOME/BIN

加载环境变量使其生效

source /etc/profile

  

zookeeper搭建

kafka是通过zookeeper来管理集群。
kafka软件包内虽然包括了一个简版的zookeeper,但是感觉功能有限。在生产环境下,建议还是直接下载官方zookeeper软件。

 

一.zookeeper下载与安装

1)下载

wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

2)解压

tar zxvf zookeeper-3.4.6.tar.gz

3)配置

cd zookeeper-3.4.6

cp -rf conf/zoo_sample.cfg conf/zoo.cfg
vim conf/zoo.cfg

  

zoo.cfg:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/Users/apple/Documents/soft/zookeeper_soft/zkdata #这个目录是预先创建的
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

  

tickTime=2000 #心跳时间,单位:毫秒
initLimit=10 #Follower在启动时需要在10个心跳时间内从Leader同步数据
syncLimit=5 #超过5个心跳时间收不到Follower的响应,就认为此Follower已经下线
dataDir=/zyxx_data/zookeeper/data00 #zookeeper存储数据的目录
clientPort=2181 #zookeeper服务端口
server.0=192.168.6.56:20881:30881
server.1=192.168.6.56:20882:30882
server.2=192.168.6.56:20883:30883

  

server.0、server.1、server.2 是指整个zookeeper集群内的节点列表。server的配置规则为:server.N=YYY:A:B
N表示服务器编号
YYY表示服务器的IP地址
A为LF通信端口,表示该服务器与集群中的leader交换的信息的端口。
B为选举端口,表示选举新leader时服务器间相互通信的端口(当leader挂掉时,其余服务器会相互通信,选择出新的leader)
一般来说,集群中每个服务器的A端口都是一样,每个服务器的B端口也是一样。但是当所采用的为伪集群时,IP地址都一样,只能是A端口和B端口不一样。

  

4)启动zookeeper

 

adeMacBook-Pro:bin apple$ sh zkServer.sh start
JMX enabled by default
Using config: /Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../conf/zoo.cfg
-n Starting zookeeper ... 
STARTED
adeMacBook-Pro:bin apple$ ps ax| grep zookeeper.out 
10311 s003  S+     0:00.01 grep zookeeper.out
adeMacBook-Pro:bin apple$ ps ax| grep zookeeper
10307 s003  S      0:00.63 /usr/bin/java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../build/classes:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../build/lib/*.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/slf4j-log4j12-1.6.1.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/slf4j-api-1.6.1.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/netty-3.7.0.Final.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/log4j-1.2.16.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../lib/jline-0.9.94.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../zookeeper-3.4.6.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../src/java/lib/*.jar:/Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.Only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /Users/apple/Documents/soft/zookeeper_soft/zookeeper-3.4.6/bin/../conf/zoo.cfg

  

二 下载并且安装kafka(预先得安装配置好scala的环境,Mac环境参照:mac平台scala开发环境搭建)

1).下载kafka:

1).下载kafka:

wget http://apache.fayea.com/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz

2) 解压:

tar -zxf kafka_2.10-0.8.2.1.tgz

3)启动kafka

  adeMacBook-Pro:kafka_2.10-0.8.2.1 apple$ sh bin/kafka-server-start.sh config/server.properties

备注:要挂到后台使用:

sh bin/kafka-server-start.sh config/server.properties &

"-daemon" 参数代表以守护进程的方式启动kafka server。
sh bin/kafka-server-start.sh config/server.properties --daemon
官网及网上大多给的启动命令是没有"-daemon"参数,如:“bin/kafka-server-start.sh config/server.properties &”,但是这种方式启动后,如果用户退出的ssh连接,进程就有可能结束,具体不清楚为什么。

  

4)新建一个TOPIC

adeMacBook-Pro:bin apple$ sh kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181

备注:要挂到后台使用:

sh kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181 &

创建主题
kafka生产和消费数据,必须基于主题topic。主题其实就是对消息的分类。
创建主题:名称为“test”、复制数目为1、partitions为1的topic主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

replication-factor : 复制数目,提供failover机制;1代表只在一个broker上有数据记录,一般值都大于1,代表一份数据会自动同步到其他的多个broker,防止某个broker宕机后数据丢失。
partitions : 一个topic可以被切分成多个partitions,一个消费者可以消费多个partitions,但一个partitions只能被一个消费者消费,所以增加partitions可以增加消费者的吞吐量。kafka只保证一个partitions内的消息是有序的,多个一个partitions之间的数据是无序的。

  查看已经创建的主题

bin/kafka-topics.sh --list --zookeeper localhost:2181

  

启动生产者和消费者

生产者产生(输入)数据,消费者消费(输出)数据

 

5) 把KAFKA的生产者启动起来:

adeMacBook-Pro:bin apple$ sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic

备注:要挂到后台使用:

sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic &

6)另开一个终端,把消费者启动起来:

adeMacBook-Pro:bin apple$ sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

备注:要挂到后台使用:

sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning &

7)使用

  1. 在发送消息的终端输入aaa,则可以在消费消息的终端显示,如下图所示:

(单机安装kafka) mac 安装jdk zookeeper kafka   python kafka模块

 

 6.关闭kafka和zookeeper :

cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin
sh kafka-server-stop.sh ../config/server.properties

cd /Volumes/Untitled/application/zookeeper-3.4.6/bin
sh zkServer.sh stop

  

心得总结:  
1.produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口;  
2.必须先创建topic才能使用;  
3.topic本质是以文件的形式储存在zookeeper上的。  

 

我的mac执行命令

启动zookeeper
cd /Volumes/Untitled/application/zookeeper-3.4.6/bin
sh zkServer.sh start


启动kafka
cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1
sh bin/kafka-server-start.sh config/server.properties &

另一个终端创建主题
cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin
sh kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181 &

另一个终端创建生产者
cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin
sh kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic # 测试的时候可以不用后台


另一个终端创建消费者
cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

停止kafka zookeeper
或者 ps -ef |grep kafka-|grep -v grep |xargs kill -9 # 详见kil的信号处理 1 2 9 15 cd /Volumes/Untitled/application/kafka_2.10-0.8.2.1/bin sh kafka-server-stop.sh ../config/server.properties cd /Volumes/Untitled/application/zookeeper-3.4.6/bin sh zkServer.sh stop

  最后附上0.90版本之后启动消费者的方法: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning(单机安装kafka) mac 安装jdk zookeeper kafka   python kafka模块

 

 

安装kafka 

python -m pip install kafka

 

重要:配置本机主机名在 /etc/hosts 文件  

命令查看 hostname

127.0.0.1 对应的主机名

 

topic 用上面的

生产者:

#!/usr/bin/env python
# _*_coding:utf-8_*_
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json


class Kafka_producer():
    '''
    使用kafka的生产模块
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers='{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort,
            ))

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
            producer.flush()
        except KafkaError as e:
            print e


class Kafka_consumer():
    '''
    使用Kafka—python的消费模块
    '''

    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
        self.kafkaHost = kafkahost
        self.kafkaPort = kafkaport
        self.kafkatopic = kafkatopic
        self.groupid = groupid
        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
                                      bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
            kafka_host=self.kafkaHost,
            kafka_port=self.kafkaPort ))

    def consume_data(self):
        try:
            for message in self.consumer:
                # print json.loads(message.value)
                yield message
        except KeyboardInterrupt, e:
            print e


def main():
    '''
    测试consumer和producer
    :return:
    '''
    ##测试生产模块
    producer = Kafka_producer("localhost", 9092, "kafkatopic")
    for id in range(10):
       params = '{abetst}:{null}---1111' + str(id)
       producer.sendjsondata(params)
    ##测试消费模块
    #消费模块的返回格式为ConsumerRecord(topic=u'ranktest', partition=0, offset=202, timestamp=None,
    #\timestamp_type=None, key=None, value='"{abetst}:{null}---0"', checksum=-1868164195,
    #\serialized_key_size=-1, serialized_value_size=21)
    # consumer = Kafka_consumer('127.0.0.1', 9092, "ranktest", 'test-python-ranktest')
    # consumer = Kafka_consumer('127.0.0.1', 9092, "kafkatopic", 'test-python-ranktest')
    # message = consumer.consume_data()
    # for i in message:
    #     print i.value


if __name__ == '__main__':
    main()

 

 

消费者  consumergroup 换一个 就能重新拿全部数据

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import json
from pykafka import KafkaClient
# client = KafkaClient(hosts="192.168.1.1:9092, 192.168.1.2:9092") # 可接受多个Client这是重点
client = KafkaClient(hosts="127.0.0.1:9092") # 可接受多个Client这是重点
print client.topics  # 所有topic 
topic = client.topics['kafkatopic']
# 生产者
# producer = topic.get_producer()
# producer.produce(['test message ' + str(i ** 2) for i in range(4)]) # 加了个str官方的例子py2.7跑不过

# 消费者
# balanced_consumer = topic.get_balanced_consumer(
#     consumer_group='testgroup',
#     auto_commit_enable=True,  # 设置为Flase的时候不需要添加 consumer_group
#     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot' # 这里就是连接多个zk
# )

balanced_consumer = topic.get_balanced_consumer(
    consumer_group="mykafka2",
    auto_commit_enable=True,
    zookeeper_connect="127.0.0.1:2181"
)

# partition = balanced_consumer.partitions[0]
# offset = partition.latest_available_offset() - 10
# balanced_consumer.reset_offsets(((partition, offset),))
# print balanced_consumer.commit_offsets()
# print balanced_consumer
for message in balanced_consumer:
    if message is None:
        continue
    try:
        msg = json.loads(message.value)
        print msg
    except Exception as e:
        print message.value

 


推荐阅读
  • 基于Web的Kafka管理工具Kafkamanager首次访问Web界面的详细配置指南(附图解)
    首次访问Kafkamanager Web界面时,需要对Kafka集群进行配置。这一过程相对简单,用户只需依次点击【Cluster】>【Add Cluster】,按照提示完成相关设置即可。本文将通过图文并茂的方式,详细介绍每一步的配置步骤,帮助用户快速上手Kafkamanager。 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • 本文探讨了 Kafka 集群的高效部署与优化策略。首先介绍了 Kafka 的下载与安装步骤,包括从官方网站获取最新版本的压缩包并进行解压。随后详细讨论了集群配置的最佳实践,涵盖节点选择、网络优化和性能调优等方面,旨在提升系统的稳定性和处理能力。此外,还提供了常见的故障排查方法和监控方案,帮助运维人员更好地管理和维护 Kafka 集群。 ... [详细]
  • 本文由公众号【数智物语】(ID: decision_engine)发布,关注获取更多干货。文章探讨了从数据收集到清洗、建模及可视化的全过程,介绍了41款实用工具,旨在帮助数据科学家和分析师提升工作效率。 ... [详细]
  • 将XML数据迁移至Oracle Autonomous Data Warehouse (ADW)
    随着Oracle ADW的推出,数据迁移至ADW成为业界关注的焦点。特别是XML和JSON这类结构化数据的迁移需求日益增长。本文将通过一个实际案例,探讨如何高效地将XML数据迁移至ADW。 ... [详细]
  • 本文详细探讨了如何根据不同的应用场景选择合适的PHP版本,包括多版本切换技巧、稳定性分析及针对WordPress等特定平台的版本建议。 ... [详细]
  • 搭建个人博客:WordPress安装详解
    计划建立个人博客来分享生活与工作的见解和经验,选择WordPress是因为它专为博客设计,功能强大且易于使用。 ... [详细]
  • HBase 数据复制与灾备同步策略
    本文探讨了HBase在企业级应用中的数据复制与灾备同步解决方案,包括存量数据迁移及增量数据实时同步的方法。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 阿里巴巴终面技术挑战:如何利用 UDP 实现 TCP 功能?
    在阿里巴巴的技术面试中,技术总监曾提出一道关于如何利用 UDP 实现 TCP 功能的问题。当时回答得不够理想,因此事后进行了详细总结。通过与总监的进一步交流,了解到这是一道常见的阿里面试题。面试官的主要目的是考察应聘者对 UDP 和 TCP 在原理上的差异的理解,以及如何通过 UDP 实现类似 TCP 的可靠传输机制。 ... [详细]
  • Kafka 是由 Apache 软件基金会开发的高性能分布式消息系统,支持高吞吐量的发布和订阅功能,主要使用 Scala 和 Java 编写。本文将深入解析 Kafka 的安装与配置过程,为程序员提供详尽的操作指南,涵盖从环境准备到集群搭建的每一个关键步骤。 ... [详细]
  • 在CentOS系统中部署与配置ZooKeeper详解 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • Kafka核心理论问题汇编【持续更新中】
    本文汇总了Kafka的核心理论问题,涵盖了常见的技术难点和解决方案。内容将持续更新,旨在为开发者提供全面的参考。文章源自博客园,作者呱嗒呱嗒,转载时请注明出处。 ... [详细]
  • HTML 表格与列表标签详解
    本文详细介绍了HTML中的表格标签和列表标签的使用方法,包括如何创建表格的不同部分(如标题、头部、主体和脚部),以及如何利用这些标签进行数据展示和布局设计。 ... [详细]
author-avatar
撒大时代撒的照写真
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有